blob: 5e3df0ae2ccaab41a04da3032429d964791ee290 [file] [log] [blame]
Tor Norbye3a2425a2013-11-04 10:16:08 -08001"""
2This is an select module for use on JVMs > 1.5.
3It is documented, along with known issues and workarounds, on the jython wiki.
4http://wiki.python.org/jython/SelectModule
5"""
6
7import java.nio.channels.SelectableChannel
8import java.nio.channels.SelectionKey
9import java.nio.channels.Selector
10from java.nio.channels.SelectionKey import OP_ACCEPT, OP_CONNECT, OP_WRITE, OP_READ
11
12import errno
13import os
14import Queue
15import socket
16
17class error(Exception): pass
18
19ALL = None
20
21_exception_map = {
22
23# (<javaexception>, <circumstance>) : lambda: <code that raises the python equivalent>
24
25(java.nio.channels.IllegalBlockingModeException, ALL) : error(errno.ESOCKISBLOCKING, 'socket must be in non-blocking mode'),
26}
27
28def _map_exception(exc, circumstance=ALL):
29 try:
30 mapped_exception = _exception_map[(exc.__class__, circumstance)]
31 mapped_exception.java_exception = exc
32 return mapped_exception
33 except KeyError:
34 return error(-1, 'Unmapped java exception: <%s:%s>' % (exc.toString(), circumstance))
35
36POLLIN = 1
37POLLOUT = 2
38
39# The following event types are completely ignored on jython
40# Java does not support them, AFAICT
41# They are declared only to support code compatibility with cpython
42
43POLLPRI = 4
44POLLERR = 8
45POLLHUP = 16
46POLLNVAL = 32
47
48def _getselectable(selectable_object):
49 try:
50 channel = selectable_object.getchannel()
51 except:
52 try:
53 channel = selectable_object.fileno().getChannel()
54 except:
55 raise TypeError("Object '%s' is not watchable" % selectable_object,
56 errno.ENOTSOCK)
57
58 if channel and not isinstance(channel, java.nio.channels.SelectableChannel):
59 raise TypeError("Object '%s' is not watchable" % selectable_object,
60 errno.ENOTSOCK)
61 return channel
62
63class poll:
64
65 def __init__(self):
66 self.selector = java.nio.channels.Selector.open()
67 self.chanmap = {}
68 self.unconnected_sockets = []
69
70 def _register_channel(self, socket_object, channel, mask):
71 jmask = 0
72 if mask & POLLIN:
73 # Note that OP_READ is NOT a valid event on server socket channels.
74 if channel.validOps() & OP_ACCEPT:
75 jmask = OP_ACCEPT
76 else:
77 jmask = OP_READ
78 if mask & POLLOUT:
79 if channel.validOps() & OP_WRITE:
80 jmask |= OP_WRITE
81 if channel.validOps() & OP_CONNECT:
82 jmask |= OP_CONNECT
83 selectionkey = channel.register(self.selector, jmask)
84 self.chanmap[channel] = (socket_object, selectionkey)
85
86 def _check_unconnected_sockets(self):
87 temp_list = []
88 for socket_object, mask in self.unconnected_sockets:
89 channel = _getselectable(socket_object)
90 if channel is not None:
91 self._register_channel(socket_object, channel, mask)
92 else:
93 temp_list.append( (socket_object, mask) )
94 self.unconnected_sockets = temp_list
95
96 def register(self, socket_object, mask = POLLIN|POLLOUT|POLLPRI):
97 try:
98 channel = _getselectable(socket_object)
99 if channel is None:
100 # The socket is not yet connected, and thus has no channel
101 # Add it to a pending list, and return
102 self.unconnected_sockets.append( (socket_object, mask) )
103 return
104 self._register_channel(socket_object, channel, mask)
105 except java.lang.Exception, jlx:
106 raise _map_exception(jlx)
107
108 def unregister(self, socket_object):
109 try:
110 channel = _getselectable(socket_object)
111 self.chanmap[channel][1].cancel()
112 del self.chanmap[channel]
113 except java.lang.Exception, jlx:
114 raise _map_exception(jlx)
115
116 def _dopoll(self, timeout):
117 if timeout is None or timeout < 0:
118 self.selector.select()
119 else:
120 try:
121 timeout = int(timeout)
122 if not timeout:
123 self.selector.selectNow()
124 else:
125 # No multiplication required: both cpython and java use millisecond timeouts
126 self.selector.select(timeout)
127 except ValueError, vx:
128 raise error("poll timeout must be a number of milliseconds or None", errno.EINVAL)
129 # The returned selectedKeys cannot be used from multiple threads!
130 return self.selector.selectedKeys()
131
132 def poll(self, timeout=None):
133 try:
134 self._check_unconnected_sockets()
135 selectedkeys = self._dopoll(timeout)
136 results = []
137 for k in selectedkeys.iterator():
138 jmask = k.readyOps()
139 pymask = 0
140 if jmask & OP_READ: pymask |= POLLIN
141 if jmask & OP_WRITE: pymask |= POLLOUT
142 if jmask & OP_ACCEPT: pymask |= POLLIN
143 if jmask & OP_CONNECT: pymask |= POLLOUT
144 # Now return the original userobject, and the return event mask
145 results.append( (self.chanmap[k.channel()][0], pymask) )
146 return results
147 except java.lang.Exception, jlx:
148 raise _map_exception(jlx)
149
150 def _deregister_all(self):
151 try:
152 for k in self.selector.keys():
153 k.cancel()
154 # Keys are not actually removed from the selector until the next select operation.
155 self.selector.selectNow()
156 except java.lang.Exception, jlx:
157 raise _map_exception(jlx)
158
159 def close(self):
160 try:
161 self._deregister_all()
162 self.selector.close()
163 except java.lang.Exception, jlx:
164 raise _map_exception(jlx)
165
166def _calcselecttimeoutvalue(value):
167 if value is None:
168 return None
169 try:
170 floatvalue = float(value)
171 except Exception, x:
172 raise TypeError("Select timeout value must be a number or None")
173 if value < 0:
174 raise error("Select timeout value cannot be negative", errno.EINVAL)
175 if floatvalue < 0.000001:
176 return 0
177 return int(floatvalue * 1000) # Convert to milliseconds
178
179# This cache for poll objects is required because of a bug in java on MS Windows
180# http://bugs.jython.org/issue1291
181
182class poll_object_cache:
183
184 def __init__(self):
185 self.is_windows = os._name == 'nt'
186 if self.is_windows:
187 self.poll_object_queue = Queue.Queue()
188 import atexit
189 atexit.register(self.finalize)
190
191 def get_poll_object(self):
192 if not self.is_windows:
193 return poll()
194 try:
195 return self.poll_object_queue.get(False)
196 except Queue.Empty:
197 return poll()
198
199 def release_poll_object(self, pobj):
200 if self.is_windows:
201 pobj._deregister_all()
202 self.poll_object_queue.put(pobj)
203 else:
204 pobj.close()
205
206 def finalize(self):
207 if self.is_windows:
208 while True:
209 try:
210 p = self.poll_object_queue.get(False)
211 p.close()
212 except Queue.Empty:
213 return
214
215_poll_object_cache = poll_object_cache()
216
217def native_select(read_fd_list, write_fd_list, outofband_fd_list, timeout=None):
218 timeout = _calcselecttimeoutvalue(timeout)
219 # First create a poll object to do the actual watching.
220 pobj = _poll_object_cache.get_poll_object()
221 try:
222 registered_for_read = {}
223 # Check the read list
224 for fd in read_fd_list:
225 pobj.register(fd, POLLIN)
226 registered_for_read[fd] = 1
227 # And now the write list
228 for fd in write_fd_list:
229 if fd in registered_for_read:
230 # registering a second time overwrites the first
231 pobj.register(fd, POLLIN|POLLOUT)
232 else:
233 pobj.register(fd, POLLOUT)
234 results = pobj.poll(timeout)
235 # Now start preparing the results
236 read_ready_list, write_ready_list, oob_ready_list = [], [], []
237 for fd, mask in results:
238 if mask & POLLIN:
239 read_ready_list.append(fd)
240 if mask & POLLOUT:
241 write_ready_list.append(fd)
242 return read_ready_list, write_ready_list, oob_ready_list
243 finally:
244 _poll_object_cache.release_poll_object(pobj)
245
246select = native_select
247
248def cpython_compatible_select(read_fd_list, write_fd_list, outofband_fd_list, timeout=None):
249 # First turn all sockets to non-blocking
250 # keeping track of which ones have changed
251 modified_channels = []
252 try:
253 for socket_list in [read_fd_list, write_fd_list, outofband_fd_list]:
254 for s in socket_list:
255 channel = _getselectable(s)
256 if channel.isBlocking():
257 modified_channels.append(channel)
258 channel.configureBlocking(0)
259 return native_select(read_fd_list, write_fd_list, outofband_fd_list, timeout)
260 finally:
261 for channel in modified_channels:
262 channel.configureBlocking(1)