Just van Rossum | 4f0d1f5 | 1999-01-30 22:40:26 +0000 | [diff] [blame] | 1 | """Async sockets, build on top of Sam Rushing's excellent async library""" |
| 2 | |
| 3 | import asyncore |
| 4 | import socket |
| 5 | from socket import AF_INET, SOCK_STREAM |
| 6 | import string |
| 7 | import cStringIO |
| 8 | import mimetools |
| 9 | import httplib |
| 10 | |
| 11 | |
| 12 | __version__ = "0.9" |
| 13 | __author__ = "jvr" |
| 14 | |
| 15 | BUFSIZE = 512 |
| 16 | |
| 17 | VERBOSE = 1 |
| 18 | |
| 19 | class Server(asyncore.dispatcher): |
| 20 | |
| 21 | """Generic asynchronous server class""" |
| 22 | |
| 23 | def __init__(self, port, handler_class, backlog=1, host=""): |
| 24 | """arguments: |
| 25 | - port: the port to listen to |
| 26 | - handler_class: class to handle requests |
| 27 | - backlog: backlog queue size (optional) (don't fully understand, see socket docs) |
| 28 | - host: host name (optional: can be empty to use default host name) |
| 29 | """ |
| 30 | if VERBOSE: |
| 31 | print "Starting", self.__class__.__name__ |
| 32 | self.handler_class = handler_class |
| 33 | asyncore.dispatcher.__init__(self) |
| 34 | self.create_socket(socket.AF_INET, socket.SOCK_STREAM) |
| 35 | self.bind((host, port)) |
| 36 | self.listen(backlog) |
| 37 | |
| 38 | def handle_accept(self): |
| 39 | conn, addr = self.accept() |
| 40 | if VERBOSE: |
| 41 | print 'Incoming Connection from %s:%d' % addr |
| 42 | self.handler_class(conn) |
| 43 | |
| 44 | |
| 45 | class ProxyServer(Server): |
| 46 | |
| 47 | """Generic proxy server class""" |
| 48 | |
| 49 | def __init__(self, port, handler_class, proxyaddr=None, closepartners=0): |
| 50 | """arguments: |
| 51 | - port: the port to listen to |
| 52 | - handler_class: proxy class to handle requests |
| 53 | - proxyaddr: a tuple containing the address and |
| 54 | port of a remote host to connect to (optional) |
| 55 | - closepartners: boolean, specifies whether we should close |
| 56 | all proxy connections or not (optional). http seems to *not* |
| 57 | want this, but telnet does... |
| 58 | """ |
| 59 | Server.__init__(self, port, handler_class, 1, "") |
| 60 | self.proxyaddr = proxyaddr |
| 61 | self.closepartners = closepartners |
| 62 | |
| 63 | def handle_accept(self): |
| 64 | conn, addr = self.accept() |
| 65 | if VERBOSE: |
| 66 | print 'Incoming Connection from %s:%d' % addr |
| 67 | self.handler_class(conn, self.proxyaddr, closepartner=self.closepartners) |
| 68 | |
| 69 | |
| 70 | class Connection(asyncore.dispatcher): |
| 71 | |
| 72 | """Generic connection class""" |
| 73 | |
| 74 | def __init__(self, sock_or_address=None, readfunc=None, terminator=None): |
| 75 | """arguments: |
| 76 | - sock_or_address: either a socket, or a tuple containing the name |
| 77 | and port number of a remote host |
| 78 | - readfunc: callback function (optional). Will be called whenever |
| 79 | there is some data available, or when an appropraite terminator |
| 80 | is found. |
| 81 | - terminator: string which specifies when a read is complete (optional)""" |
| 82 | self._out_buffer = "" |
| 83 | self._in_buffer = "" |
| 84 | self.readfunc = readfunc |
| 85 | self.terminator = terminator |
| 86 | asyncore.dispatcher.__init__(self) |
| 87 | if hasattr(sock_or_address, "fileno"): |
| 88 | self.set_socket(sock_or_address) |
| 89 | else: |
| 90 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 91 | sock.setblocking(0) |
| 92 | self.set_socket(sock) |
| 93 | if sock_or_address: |
| 94 | self.connect(sock_or_address) |
| 95 | |
| 96 | # public methods |
| 97 | def send(self, data): |
| 98 | self._out_buffer = self._out_buffer + data |
| 99 | |
| 100 | def recv(self, bytes=-1): |
| 101 | if bytes == -1: |
| 102 | bytes = len(self._in_buffer) |
| 103 | data = self._in_buffer[:bytes] |
| 104 | self._in_buffer = self._in_buffer[bytes:] |
| 105 | return data |
| 106 | |
| 107 | def set_terminator(self, terminator): |
| 108 | self.terminator = terminator |
| 109 | |
| 110 | # override this if you want to control the incoming data stream |
| 111 | def handle_incoming_data(self, data): |
| 112 | if self.readfunc: |
| 113 | if self.terminator: |
| 114 | self._in_buffer = self._in_buffer + data |
| 115 | pos = string.find(self._in_buffer, self.terminator) |
| 116 | if pos < 0: |
| 117 | return |
| 118 | data = self._in_buffer[:pos] |
| 119 | self._in_buffer = self._in_buffer[pos + len(self.terminator):] |
| 120 | self.readfunc(data) |
| 121 | else: |
| 122 | self.readfunc(self._in_buffer + data) |
| 123 | self._in_buffer = "" |
| 124 | else: |
| 125 | self._in_buffer = self._in_buffer + data |
| 126 | |
| 127 | # internal muck |
| 128 | def handle_read(self): |
| 129 | data = asyncore.dispatcher.recv(self, BUFSIZE) |
| 130 | if data: |
| 131 | if VERBOSE > 2: |
| 132 | print "incoming ->", "%x" % id(self), `data` |
| 133 | self.handle_incoming_data(data) |
| 134 | |
| 135 | def handle_write(self): |
| 136 | if self._out_buffer: |
| 137 | sent = self.socket.send(self._out_buffer[:BUFSIZE]) |
| 138 | if VERBOSE > 2: |
| 139 | print "outgoing ->", "%x" % id(self), `self._out_buffer[:sent]` |
| 140 | self._out_buffer = self._out_buffer[sent:] |
| 141 | |
| 142 | def close(self): |
| 143 | if self.readfunc and self._in_buffer: |
| 144 | self.readfunc(self._in_buffer) |
| 145 | self._in_buffer = "" |
| 146 | #elif VERBOSE > 1 and self._in_buffer: |
| 147 | # print "--- there is unread data:", `self._in_buffer` |
| 148 | asyncore.dispatcher.close(self) |
| 149 | |
| 150 | def handle_close(self): |
| 151 | self.close() |
| 152 | |
| 153 | def handle_connect(self): |
| 154 | pass |
| 155 | |
| 156 | |
| 157 | class ConnectionUI: |
| 158 | |
| 159 | """Glue to let a connection tell things to the UI in a standardized way. |
| 160 | |
| 161 | The protocoll defines four functions, which the connection will call: |
| 162 | |
| 163 | def settotal(int total): gets called when the connection knows the data size |
| 164 | def setcurrent(int current): gets called when some new data has arrived |
| 165 | def done(): gets called when the transaction is complete |
| 166 | def error(type, value, tb): gets called wheneven an error occured |
| 167 | """ |
| 168 | |
| 169 | def __init__(self, settotal_func, setcurrent_func, done_func, error_func): |
| 170 | self.settotal = settotal_func |
| 171 | self.setcurrent = setcurrent_func |
| 172 | self.done = done_func |
| 173 | self.error = error_func |
| 174 | |
| 175 | |
| 176 | class HTTPError(socket.error): pass |
| 177 | |
| 178 | |
| 179 | class HTTPClient(Connection, httplib.HTTP): |
| 180 | |
| 181 | """Asynchronous HTTP connection""" |
| 182 | |
| 183 | def __init__(self, (host, port), datahandler, ui=None): |
| 184 | Connection.__init__(self, (host, port)) |
| 185 | self.datahandler = datahandler |
| 186 | self.ui = ui |
| 187 | self.buf = "" |
| 188 | self.doneheaders = 0 |
| 189 | self.done = 0 |
| 190 | self.headers = None |
| 191 | self.length = None |
| 192 | self.pos = 0 |
| 193 | |
| 194 | def getreply(self): |
| 195 | raise TypeError, "getreply() is not supported in async HTTP connection" |
| 196 | |
| 197 | def handle_incoming_data(self, data): |
| 198 | assert not self.done |
| 199 | if not self.doneheaders: |
| 200 | self.buf = self.buf + data |
| 201 | pos = string.find(self.buf, "\r\n\r\n") |
| 202 | if pos >= 0: |
| 203 | self.handle_reply(self.buf[:pos+4]) |
| 204 | length = self.headers.getheader("Content-Length") |
| 205 | if length is not None: |
| 206 | self.length = int(length) |
| 207 | if self.ui is not None: |
| 208 | self.ui.settotal(self.length) |
| 209 | else: |
| 210 | self.length = -1 |
| 211 | self.doneheaders = 1 |
| 212 | self.handle_data(self.buf[pos+4:]) |
| 213 | self.buf = "" |
| 214 | else: |
| 215 | self.handle_data(data) |
| 216 | |
| 217 | def handle_reply(self, data): |
| 218 | f = cStringIO.StringIO(data) |
| 219 | ver, code, msg = string.split(f.readline(), None, 2) |
| 220 | code = int(code) |
| 221 | msg = string.strip(msg) |
| 222 | if code <> 200: |
| 223 | # Hm, this is what *I* need, but probably not correct... |
| 224 | raise HTTPError, (code, msg) |
| 225 | self.headers = mimetools.Message(f) |
| 226 | |
| 227 | def handle_data(self, data): |
| 228 | self.pos = self.pos + len(data) |
| 229 | if self.ui is not None: |
| 230 | self.ui.setcurrent(self.pos) |
| 231 | self.datahandler(data) |
| 232 | if self.pos >= self.length: |
| 233 | self.datahandler("") |
| 234 | self.done = 1 |
| 235 | if self.ui is not None: |
| 236 | self.ui.done() |
| 237 | |
| 238 | def handle_error(self, type, value, tb): |
| 239 | if self.ui is not None: |
| 240 | self.ui.error(type, value, tb) |
| 241 | else: |
| 242 | Connection.handle_error(self, type, value, tb) |
| 243 | |
| 244 | def log(self, message): |
| 245 | if VERBOSE: |
| 246 | print 'LOG:', message |
| 247 | |
| 248 | |
| 249 | class PyMessage: |
| 250 | |
| 251 | def __init__(self): |
| 252 | self._buf = "" |
| 253 | self._len = None |
| 254 | self._checksum = None |
| 255 | |
| 256 | def feed(self, data): |
| 257 | self._buf = self._buf + data |
| 258 | if self._len is None: |
| 259 | if len(self._buf) >= 8: |
| 260 | import struct |
| 261 | self._len, self._checksum = struct.unpack("ll", self._buf[:8]) |
| 262 | self._buf = self._buf[8:] |
| 263 | if self._len is not None: |
| 264 | if len(self._buf) >= self._len: |
| 265 | import zlib |
| 266 | data = self._buf[:self._len] |
| 267 | leftover = self._buf[self._len:] |
| 268 | self._buf = None |
| 269 | assert self._checksum == zlib.adler32(data), "corrupt data" |
| 270 | self.data = data |
| 271 | return 1, leftover |
| 272 | else: |
| 273 | return 0, None |
| 274 | else: |
| 275 | return 0, None |
| 276 | |
| 277 | |
| 278 | class PyConnection(Connection): |
| 279 | |
| 280 | def __init__(self, sock_or_address): |
| 281 | Connection.__init__(self, sock_or_address) |
| 282 | self.currentmessage = PyMessage() |
| 283 | |
| 284 | def handle_incoming_data(self, data): |
| 285 | while data: |
| 286 | done, data = self.currentmessage.feed(data) |
| 287 | if done: |
| 288 | import cPickle |
| 289 | self.handle_object(cPickle.loads(self.currentmessage.data)) |
| 290 | self.currentmessage = PyMessage() |
| 291 | |
| 292 | def handle_object(self, object): |
| 293 | print 'unhandled object:', `object` |
| 294 | |
| 295 | def send(self, object): |
| 296 | import cPickle, zlib, struct |
| 297 | data = cPickle.dumps(object, 1) |
| 298 | length = len(data) |
| 299 | checksum = zlib.adler32(data) |
| 300 | data = struct.pack("ll", length, checksum) + data |
| 301 | Connection.send(self, data) |
| 302 | |
| 303 | |
| 304 | class Echo(Connection): |
| 305 | |
| 306 | """Simple echoing connection: it sends everything back it receives.""" |
| 307 | |
| 308 | def handle_incoming_data(self, data): |
| 309 | self.send(data) |
| 310 | |
| 311 | |
| 312 | class Proxy(Connection): |
| 313 | |
| 314 | """Generic proxy connection""" |
| 315 | |
| 316 | def __init__(self, sock_or_address=None, proxyaddr=None, closepartner=0): |
| 317 | """arguments: |
| 318 | - sock_or_address is either a socket or a tuple containing the |
| 319 | name and port number of a remote host |
| 320 | - proxyaddr: a tuple containing a name and a port number of a |
| 321 | remote host (optional). |
| 322 | - closepartner: boolean, specifies whether we should close |
| 323 | the proxy connection (optional)""" |
| 324 | |
| 325 | Connection.__init__(self, sock_or_address) |
| 326 | self.other = None |
| 327 | self.proxyaddr = proxyaddr |
| 328 | self.closepartner = closepartner |
| 329 | |
| 330 | def close(self): |
| 331 | if self.other: |
| 332 | other = self.other |
| 333 | self.other = None |
| 334 | other.other = None |
| 335 | if self.closepartner: |
| 336 | other.close() |
| 337 | Connection.close(self) |
| 338 | |
| 339 | def handle_incoming_data(self, data): |
| 340 | if not self.other: |
| 341 | # pass data for possible automatic remote host detection |
| 342 | # (see HTTPProxy) |
| 343 | data = self.connectproxy(data) |
| 344 | self.other.send(data) |
| 345 | |
| 346 | def connectproxy(self, data): |
| 347 | other = self.__class__(self.proxyaddr, closepartner=self.closepartner) |
| 348 | self.other = other |
| 349 | other.other = self |
| 350 | return data |
| 351 | |
| 352 | |
| 353 | class HTTPProxy(Proxy): |
| 354 | |
| 355 | """Simple, useless, http proxy. It figures out itself where to connect to.""" |
| 356 | |
| 357 | def connectproxy(self, data): |
| 358 | if VERBOSE: |
| 359 | print "--- proxy request", `data` |
| 360 | addr, data = de_proxify(data) |
| 361 | other = Proxy(addr) |
| 362 | self.other = other |
| 363 | other.other = self |
| 364 | return data |
| 365 | |
| 366 | |
| 367 | # helper for HTTPProxy |
| 368 | def de_proxify(data): |
| 369 | import re |
| 370 | req_pattern = "GET http://([a-zA-Z0-9-_.]+)(:([0-9]+))?" |
| 371 | m = re.match(req_pattern, data) |
| 372 | host, dummy, port = m.groups() |
| 373 | if not port: |
| 374 | port = 80 |
| 375 | else: |
| 376 | port = int(port) |
| 377 | # change "GET http://xx.xx.xx/yy" into "GET /yy" |
| 378 | data = re.sub(req_pattern, "GET ", data) |
| 379 | return (host, port), data |
| 380 | |
| 381 | |
| 382 | # if we're running "under W", let's register the socket poller to the event loop |
| 383 | try: |
| 384 | import W |
| 385 | except: |
| 386 | pass |
| 387 | else: |
| 388 | W.getapplication().addidlefunc(asyncore.poll) |
| 389 | |
| 390 | |
| 391 | ## testing muck |
| 392 | #testserver = Server(10000, Connection) |
| 393 | #echoserver = Server(10007, Echo) |
| 394 | #httpproxyserver = Server(8088, HTTPProxy, 5) |
| 395 | #asyncore.close_all() |