| Tor Norbye | 3a2425a | 2013-11-04 10:16:08 -0800 | [diff] [blame^] | 1 | """ |
| 2 | This is an select module for use on JVMs > 1.5. |
| 3 | It is documented, along with known issues and workarounds, on the jython wiki. |
| 4 | http://wiki.python.org/jython/SelectModule |
| 5 | """ |
| 6 | |
| 7 | import java.nio.channels.SelectableChannel |
| 8 | import java.nio.channels.SelectionKey |
| 9 | import java.nio.channels.Selector |
| 10 | from java.nio.channels.SelectionKey import OP_ACCEPT, OP_CONNECT, OP_WRITE, OP_READ |
| 11 | |
| 12 | import errno |
| 13 | import os |
| 14 | import Queue |
| 15 | import socket |
| 16 | |
| 17 | class error(Exception): pass |
| 18 | |
| 19 | ALL = None |
| 20 | |
| 21 | _exception_map = { |
| 22 | |
| 23 | # (<javaexception>, <circumstance>) : lambda: <code that raises the python equivalent> |
| 24 | |
| 25 | (java.nio.channels.IllegalBlockingModeException, ALL) : error(errno.ESOCKISBLOCKING, 'socket must be in non-blocking mode'), |
| 26 | } |
| 27 | |
| 28 | def _map_exception(exc, circumstance=ALL): |
| 29 | try: |
| 30 | mapped_exception = _exception_map[(exc.__class__, circumstance)] |
| 31 | mapped_exception.java_exception = exc |
| 32 | return mapped_exception |
| 33 | except KeyError: |
| 34 | return error(-1, 'Unmapped java exception: <%s:%s>' % (exc.toString(), circumstance)) |
| 35 | |
| 36 | POLLIN = 1 |
| 37 | POLLOUT = 2 |
| 38 | |
| 39 | # The following event types are completely ignored on jython |
| 40 | # Java does not support them, AFAICT |
| 41 | # They are declared only to support code compatibility with cpython |
| 42 | |
| 43 | POLLPRI = 4 |
| 44 | POLLERR = 8 |
| 45 | POLLHUP = 16 |
| 46 | POLLNVAL = 32 |
| 47 | |
| 48 | def _getselectable(selectable_object): |
| 49 | try: |
| 50 | channel = selectable_object.getchannel() |
| 51 | except: |
| 52 | try: |
| 53 | channel = selectable_object.fileno().getChannel() |
| 54 | except: |
| 55 | raise TypeError("Object '%s' is not watchable" % selectable_object, |
| 56 | errno.ENOTSOCK) |
| 57 | |
| 58 | if channel and not isinstance(channel, java.nio.channels.SelectableChannel): |
| 59 | raise TypeError("Object '%s' is not watchable" % selectable_object, |
| 60 | errno.ENOTSOCK) |
| 61 | return channel |
| 62 | |
| 63 | class poll: |
| 64 | |
| 65 | def __init__(self): |
| 66 | self.selector = java.nio.channels.Selector.open() |
| 67 | self.chanmap = {} |
| 68 | self.unconnected_sockets = [] |
| 69 | |
| 70 | def _register_channel(self, socket_object, channel, mask): |
| 71 | jmask = 0 |
| 72 | if mask & POLLIN: |
| 73 | # Note that OP_READ is NOT a valid event on server socket channels. |
| 74 | if channel.validOps() & OP_ACCEPT: |
| 75 | jmask = OP_ACCEPT |
| 76 | else: |
| 77 | jmask = OP_READ |
| 78 | if mask & POLLOUT: |
| 79 | if channel.validOps() & OP_WRITE: |
| 80 | jmask |= OP_WRITE |
| 81 | if channel.validOps() & OP_CONNECT: |
| 82 | jmask |= OP_CONNECT |
| 83 | selectionkey = channel.register(self.selector, jmask) |
| 84 | self.chanmap[channel] = (socket_object, selectionkey) |
| 85 | |
| 86 | def _check_unconnected_sockets(self): |
| 87 | temp_list = [] |
| 88 | for socket_object, mask in self.unconnected_sockets: |
| 89 | channel = _getselectable(socket_object) |
| 90 | if channel is not None: |
| 91 | self._register_channel(socket_object, channel, mask) |
| 92 | else: |
| 93 | temp_list.append( (socket_object, mask) ) |
| 94 | self.unconnected_sockets = temp_list |
| 95 | |
| 96 | def register(self, socket_object, mask = POLLIN|POLLOUT|POLLPRI): |
| 97 | try: |
| 98 | channel = _getselectable(socket_object) |
| 99 | if channel is None: |
| 100 | # The socket is not yet connected, and thus has no channel |
| 101 | # Add it to a pending list, and return |
| 102 | self.unconnected_sockets.append( (socket_object, mask) ) |
| 103 | return |
| 104 | self._register_channel(socket_object, channel, mask) |
| 105 | except java.lang.Exception, jlx: |
| 106 | raise _map_exception(jlx) |
| 107 | |
| 108 | def unregister(self, socket_object): |
| 109 | try: |
| 110 | channel = _getselectable(socket_object) |
| 111 | self.chanmap[channel][1].cancel() |
| 112 | del self.chanmap[channel] |
| 113 | except java.lang.Exception, jlx: |
| 114 | raise _map_exception(jlx) |
| 115 | |
| 116 | def _dopoll(self, timeout): |
| 117 | if timeout is None or timeout < 0: |
| 118 | self.selector.select() |
| 119 | else: |
| 120 | try: |
| 121 | timeout = int(timeout) |
| 122 | if not timeout: |
| 123 | self.selector.selectNow() |
| 124 | else: |
| 125 | # No multiplication required: both cpython and java use millisecond timeouts |
| 126 | self.selector.select(timeout) |
| 127 | except ValueError, vx: |
| 128 | raise error("poll timeout must be a number of milliseconds or None", errno.EINVAL) |
| 129 | # The returned selectedKeys cannot be used from multiple threads! |
| 130 | return self.selector.selectedKeys() |
| 131 | |
| 132 | def poll(self, timeout=None): |
| 133 | try: |
| 134 | self._check_unconnected_sockets() |
| 135 | selectedkeys = self._dopoll(timeout) |
| 136 | results = [] |
| 137 | for k in selectedkeys.iterator(): |
| 138 | jmask = k.readyOps() |
| 139 | pymask = 0 |
| 140 | if jmask & OP_READ: pymask |= POLLIN |
| 141 | if jmask & OP_WRITE: pymask |= POLLOUT |
| 142 | if jmask & OP_ACCEPT: pymask |= POLLIN |
| 143 | if jmask & OP_CONNECT: pymask |= POLLOUT |
| 144 | # Now return the original userobject, and the return event mask |
| 145 | results.append( (self.chanmap[k.channel()][0], pymask) ) |
| 146 | return results |
| 147 | except java.lang.Exception, jlx: |
| 148 | raise _map_exception(jlx) |
| 149 | |
| 150 | def _deregister_all(self): |
| 151 | try: |
| 152 | for k in self.selector.keys(): |
| 153 | k.cancel() |
| 154 | # Keys are not actually removed from the selector until the next select operation. |
| 155 | self.selector.selectNow() |
| 156 | except java.lang.Exception, jlx: |
| 157 | raise _map_exception(jlx) |
| 158 | |
| 159 | def close(self): |
| 160 | try: |
| 161 | self._deregister_all() |
| 162 | self.selector.close() |
| 163 | except java.lang.Exception, jlx: |
| 164 | raise _map_exception(jlx) |
| 165 | |
| 166 | def _calcselecttimeoutvalue(value): |
| 167 | if value is None: |
| 168 | return None |
| 169 | try: |
| 170 | floatvalue = float(value) |
| 171 | except Exception, x: |
| 172 | raise TypeError("Select timeout value must be a number or None") |
| 173 | if value < 0: |
| 174 | raise error("Select timeout value cannot be negative", errno.EINVAL) |
| 175 | if floatvalue < 0.000001: |
| 176 | return 0 |
| 177 | return int(floatvalue * 1000) # Convert to milliseconds |
| 178 | |
| 179 | # This cache for poll objects is required because of a bug in java on MS Windows |
| 180 | # http://bugs.jython.org/issue1291 |
| 181 | |
| 182 | class poll_object_cache: |
| 183 | |
| 184 | def __init__(self): |
| 185 | self.is_windows = os._name == 'nt' |
| 186 | if self.is_windows: |
| 187 | self.poll_object_queue = Queue.Queue() |
| 188 | import atexit |
| 189 | atexit.register(self.finalize) |
| 190 | |
| 191 | def get_poll_object(self): |
| 192 | if not self.is_windows: |
| 193 | return poll() |
| 194 | try: |
| 195 | return self.poll_object_queue.get(False) |
| 196 | except Queue.Empty: |
| 197 | return poll() |
| 198 | |
| 199 | def release_poll_object(self, pobj): |
| 200 | if self.is_windows: |
| 201 | pobj._deregister_all() |
| 202 | self.poll_object_queue.put(pobj) |
| 203 | else: |
| 204 | pobj.close() |
| 205 | |
| 206 | def finalize(self): |
| 207 | if self.is_windows: |
| 208 | while True: |
| 209 | try: |
| 210 | p = self.poll_object_queue.get(False) |
| 211 | p.close() |
| 212 | except Queue.Empty: |
| 213 | return |
| 214 | |
| 215 | _poll_object_cache = poll_object_cache() |
| 216 | |
| 217 | def native_select(read_fd_list, write_fd_list, outofband_fd_list, timeout=None): |
| 218 | timeout = _calcselecttimeoutvalue(timeout) |
| 219 | # First create a poll object to do the actual watching. |
| 220 | pobj = _poll_object_cache.get_poll_object() |
| 221 | try: |
| 222 | registered_for_read = {} |
| 223 | # Check the read list |
| 224 | for fd in read_fd_list: |
| 225 | pobj.register(fd, POLLIN) |
| 226 | registered_for_read[fd] = 1 |
| 227 | # And now the write list |
| 228 | for fd in write_fd_list: |
| 229 | if fd in registered_for_read: |
| 230 | # registering a second time overwrites the first |
| 231 | pobj.register(fd, POLLIN|POLLOUT) |
| 232 | else: |
| 233 | pobj.register(fd, POLLOUT) |
| 234 | results = pobj.poll(timeout) |
| 235 | # Now start preparing the results |
| 236 | read_ready_list, write_ready_list, oob_ready_list = [], [], [] |
| 237 | for fd, mask in results: |
| 238 | if mask & POLLIN: |
| 239 | read_ready_list.append(fd) |
| 240 | if mask & POLLOUT: |
| 241 | write_ready_list.append(fd) |
| 242 | return read_ready_list, write_ready_list, oob_ready_list |
| 243 | finally: |
| 244 | _poll_object_cache.release_poll_object(pobj) |
| 245 | |
| 246 | select = native_select |
| 247 | |
| 248 | def cpython_compatible_select(read_fd_list, write_fd_list, outofband_fd_list, timeout=None): |
| 249 | # First turn all sockets to non-blocking |
| 250 | # keeping track of which ones have changed |
| 251 | modified_channels = [] |
| 252 | try: |
| 253 | for socket_list in [read_fd_list, write_fd_list, outofband_fd_list]: |
| 254 | for s in socket_list: |
| 255 | channel = _getselectable(s) |
| 256 | if channel.isBlocking(): |
| 257 | modified_channels.append(channel) |
| 258 | channel.configureBlocking(0) |
| 259 | return native_select(read_fd_list, write_fd_list, outofband_fd_list, timeout) |
| 260 | finally: |
| 261 | for channel in modified_channels: |
| 262 | channel.configureBlocking(1) |