blob: fe027f09b07a58db02ba56efee1d4b609efd12d4 [file] [log] [blame]
Charles-François Natali243d8d82013-09-04 19:02:49 +02001"""Selectors module.
2
3This module allows high-level and efficient I/O multiplexing, built upon the
4`select` module primitives.
5"""
6
7
8from abc import ABCMeta, abstractmethod
9from collections import namedtuple
10import functools
11import select
12import sys
13
14
15# generic events, that must be mapped to implementation-specific ones
16EVENT_READ = (1 << 0)
17EVENT_WRITE = (1 << 1)
18
19
20def _fileobj_to_fd(fileobj):
21 """Return a file descriptor from a file object.
22
23 Parameters:
24 fileobj -- file object or file descriptor
25
26 Returns:
27 corresponding file descriptor
28 """
29 if isinstance(fileobj, int):
30 fd = fileobj
31 else:
32 try:
33 fd = int(fileobj.fileno())
34 except (AttributeError, TypeError, ValueError):
35 raise ValueError("Invalid file object: "
36 "{!r}".format(fileobj)) from None
37 if fd < 0:
38 raise ValueError("Invalid file descriptor: {}".format(fd))
39 return fd
40
41
42SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
43"""Object used to associate a file object to its backing file descriptor,
44selected event mask and attached data."""
45
46
47class BaseSelector(metaclass=ABCMeta):
48 """Base selector class.
49
50 A selector supports registering file objects to be monitored for specific
51 I/O events.
52
53 A file object is a file descriptor or any object with a `fileno()` method.
54 An arbitrary object can be attached to the file object, which can be used
55 for example to store context information, a callback, etc.
56
57 A selector can use various implementations (select(), poll(), epoll()...)
58 depending on the platform. The default `Selector` class uses the most
59 performant implementation on the current platform.
60 """
61
62 def __init__(self):
63 # this maps file descriptors to keys
64 self._fd_to_key = {}
65
66 def register(self, fileobj, events, data=None):
67 """Register a file object.
68
69 Parameters:
70 fileobj -- file object or file descriptor
71 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
72 data -- attached data
73
74 Returns:
75 SelectorKey instance
76 """
77 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
78 raise ValueError("Invalid events: {!r}".format(events))
79
80 key = SelectorKey(fileobj, _fileobj_to_fd(fileobj), events, data)
81
82 if key.fd in self._fd_to_key:
83 raise KeyError("{!r} (FD {}) is already "
84 "registered".format(fileobj, key.fd))
85
86 self._fd_to_key[key.fd] = key
87 return key
88
89 def unregister(self, fileobj):
90 """Unregister a file object.
91
92 Parameters:
93 fileobj -- file object or file descriptor
94
95 Returns:
96 SelectorKey instance
97 """
98 try:
99 key = self._fd_to_key.pop(_fileobj_to_fd(fileobj))
100 except KeyError:
101 raise KeyError("{!r} is not registered".format(fileobj)) from None
102 return key
103
104 def modify(self, fileobj, events, data=None):
105 """Change a registered file object monitored events or attached data.
106
107 Parameters:
108 fileobj -- file object or file descriptor
109 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
110 data -- attached data
111
112 Returns:
113 SelectorKey instance
114 """
115 # TODO: Subclasses can probably optimize this even further.
116 try:
117 key = self._fd_to_key[_fileobj_to_fd(fileobj)]
118 except KeyError:
119 raise KeyError("{!r} is not registered".format(fileobj)) from None
120 if events != key.events or data != key.data:
121 # TODO: If only the data changed, use a shortcut that only
122 # updates the data.
123 self.unregister(fileobj)
124 return self.register(fileobj, events, data)
125 else:
126 return key
127
128 @abstractmethod
129 def select(self, timeout=None):
130 """Perform the actual selection, until some monitored file objects are
131 ready or a timeout expires.
132
133 Parameters:
134 timeout -- if timeout > 0, this specifies the maximum wait time, in
135 seconds
136 if timeout <= 0, the select() call won't block, and will
137 report the currently ready file objects
138 if timeout is None, select() will block until a monitored
139 file object becomes ready
140
141 Returns:
142 list of (key, events) for ready file objects
143 `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
144 """
145 raise NotImplementedError()
146
147 def close(self):
148 """Close the selector.
149
150 This must be called to make sure that any underlying resource is freed.
151 """
152 self._fd_to_key.clear()
153
154 def get_key(self, fileobj):
155 """Return the key associated to a registered file object.
156
157 Returns:
158 SelectorKey for this file object
159 """
160 try:
161 return self._fd_to_key[_fileobj_to_fd(fileobj)]
162 except KeyError:
163 raise KeyError("{!r} is not registered".format(fileobj)) from None
164
165 def __enter__(self):
166 return self
167
168 def __exit__(self, *args):
169 self.close()
170
171 def _key_from_fd(self, fd):
172 """Return the key associated to a given file descriptor.
173
174 Parameters:
175 fd -- file descriptor
176
177 Returns:
178 corresponding key, or None if not found
179 """
180 try:
181 return self._fd_to_key[fd]
182 except KeyError:
183 return None
184
185
186class SelectSelector(BaseSelector):
187 """Select-based selector."""
188
189 def __init__(self):
190 super().__init__()
191 self._readers = set()
192 self._writers = set()
193
194 def register(self, fileobj, events, data=None):
195 key = super().register(fileobj, events, data)
196 if events & EVENT_READ:
197 self._readers.add(key.fd)
198 if events & EVENT_WRITE:
199 self._writers.add(key.fd)
200 return key
201
202 def unregister(self, fileobj):
203 key = super().unregister(fileobj)
204 self._readers.discard(key.fd)
205 self._writers.discard(key.fd)
206 return key
207
208 if sys.platform == 'win32':
209 def _select(self, r, w, _, timeout=None):
210 r, w, x = select.select(r, w, w, timeout)
211 return r, w + x, []
212 else:
213 _select = select.select
214
215 def select(self, timeout=None):
216 timeout = None if timeout is None else max(timeout, 0)
217 ready = []
218 try:
219 r, w, _ = self._select(self._readers, self._writers, [], timeout)
220 except InterruptedError:
221 return ready
222 r = set(r)
223 w = set(w)
224 for fd in r | w:
225 events = 0
226 if fd in r:
227 events |= EVENT_READ
228 if fd in w:
229 events |= EVENT_WRITE
230
231 key = self._key_from_fd(fd)
232 if key:
233 ready.append((key, events & key.events))
234 return ready
235
236
237if hasattr(select, 'poll'):
238
239 class PollSelector(BaseSelector):
240 """Poll-based selector."""
241
242 def __init__(self):
243 super().__init__()
244 self._poll = select.poll()
245
246 def register(self, fileobj, events, data=None):
247 key = super().register(fileobj, events, data)
248 poll_events = 0
249 if events & EVENT_READ:
250 poll_events |= select.POLLIN
251 if events & EVENT_WRITE:
252 poll_events |= select.POLLOUT
253 self._poll.register(key.fd, poll_events)
254 return key
255
256 def unregister(self, fileobj):
257 key = super().unregister(fileobj)
258 self._poll.unregister(key.fd)
259 return key
260
261 def select(self, timeout=None):
262 timeout = None if timeout is None else max(int(1000 * timeout), 0)
263 ready = []
264 try:
265 fd_event_list = self._poll.poll(timeout)
266 except InterruptedError:
267 return ready
268 for fd, event in fd_event_list:
269 events = 0
270 if event & ~select.POLLIN:
271 events |= EVENT_WRITE
272 if event & ~select.POLLOUT:
273 events |= EVENT_READ
274
275 key = self._key_from_fd(fd)
276 if key:
277 ready.append((key, events & key.events))
278 return ready
279
280
281if hasattr(select, 'epoll'):
282
283 class EpollSelector(BaseSelector):
284 """Epoll-based selector."""
285
286 def __init__(self):
287 super().__init__()
288 self._epoll = select.epoll()
289
290 def fileno(self):
291 return self._epoll.fileno()
292
293 def register(self, fileobj, events, data=None):
294 key = super().register(fileobj, events, data)
295 epoll_events = 0
296 if events & EVENT_READ:
297 epoll_events |= select.EPOLLIN
298 if events & EVENT_WRITE:
299 epoll_events |= select.EPOLLOUT
300 self._epoll.register(key.fd, epoll_events)
301 return key
302
303 def unregister(self, fileobj):
304 key = super().unregister(fileobj)
305 self._epoll.unregister(key.fd)
306 return key
307
308 def select(self, timeout=None):
309 timeout = -1 if timeout is None else max(timeout, 0)
310 max_ev = len(self._fd_to_key)
311 ready = []
312 try:
313 fd_event_list = self._epoll.poll(timeout, max_ev)
314 except InterruptedError:
315 return ready
316 for fd, event in fd_event_list:
317 events = 0
318 if event & ~select.EPOLLIN:
319 events |= EVENT_WRITE
320 if event & ~select.EPOLLOUT:
321 events |= EVENT_READ
322
323 key = self._key_from_fd(fd)
324 if key:
325 ready.append((key, events & key.events))
326 return ready
327
328 def close(self):
329 super().close()
330 self._epoll.close()
331
332
333if hasattr(select, 'kqueue'):
334
335 class KqueueSelector(BaseSelector):
336 """Kqueue-based selector."""
337
338 def __init__(self):
339 super().__init__()
340 self._kqueue = select.kqueue()
341
342 def fileno(self):
343 return self._kqueue.fileno()
344
345 def register(self, fileobj, events, data=None):
346 key = super().register(fileobj, events, data)
347 if events & EVENT_READ:
348 kev = select.kevent(key.fd, select.KQ_FILTER_READ,
349 select.KQ_EV_ADD)
350 self._kqueue.control([kev], 0, 0)
351 if events & EVENT_WRITE:
352 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
353 select.KQ_EV_ADD)
354 self._kqueue.control([kev], 0, 0)
355 return key
356
357 def unregister(self, fileobj):
358 key = super().unregister(fileobj)
359 if key.events & EVENT_READ:
360 kev = select.kevent(key.fd, select.KQ_FILTER_READ,
361 select.KQ_EV_DELETE)
362 self._kqueue.control([kev], 0, 0)
363 if key.events & EVENT_WRITE:
364 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
365 select.KQ_EV_DELETE)
366 self._kqueue.control([kev], 0, 0)
367 return key
368
369 def select(self, timeout=None):
370 timeout = None if timeout is None else max(timeout, 0)
371 max_ev = len(self._fd_to_key)
372 ready = []
373 try:
374 kev_list = self._kqueue.control(None, max_ev, timeout)
375 except InterruptedError:
376 return ready
377 for kev in kev_list:
378 fd = kev.ident
379 flag = kev.filter
380 events = 0
381 if flag == select.KQ_FILTER_READ:
382 events |= EVENT_READ
383 if flag == select.KQ_FILTER_WRITE:
384 events |= EVENT_WRITE
385
386 key = self._key_from_fd(fd)
387 if key:
388 ready.append((key, events & key.events))
389 return ready
390
391 def close(self):
392 super().close()
393 self._kqueue.close()
394
395
396# Choose the best implementation: roughly, epoll|kqueue > poll > select.
397# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
398if 'KqueueSelector' in globals():
399 DefaultSelector = KqueueSelector
400elif 'EpollSelector' in globals():
401 DefaultSelector = EpollSelector
402elif 'PollSelector' in globals():
403 DefaultSelector = PollSelector
404else:
405 DefaultSelector = SelectSelector