| import select |
| import socket |
| import struct |
| import sys |
| import types |
| |
| VERBOSE = None |
| |
| class SocketProtocol: |
| """A simple protocol for sending strings across a socket""" |
| BUF_SIZE = 8192 |
| |
| def __init__(self, sock): |
| self.sock = sock |
| self._buffer = '' |
| self._closed = 0 |
| |
| def close(self): |
| self._closed = 1 |
| self.sock.close() |
| |
| def send(self, buf): |
| """Encode buf and write it on the socket""" |
| if VERBOSE: |
| VERBOSE.write('send %d:%s\n' % (len(buf), `buf`)) |
| self.sock.send('%d:%s' % (len(buf), buf)) |
| |
| def receive(self, timeout=0): |
| """Get next complete string from socket or return None |
| |
| Raise EOFError on EOF |
| """ |
| buf = self._read_from_buffer() |
| if buf is not None: |
| return buf |
| recvbuf = self._read_from_socket(timeout) |
| if recvbuf is None: |
| return None |
| if recvbuf == '' and self._buffer == '': |
| raise EOFError |
| if VERBOSE: |
| VERBOSE.write('recv %s\n' % `recvbuf`) |
| self._buffer = self._buffer + recvbuf |
| r = self._read_from_buffer() |
| return r |
| |
| def _read_from_socket(self, timeout): |
| """Does not block""" |
| if self._closed: |
| return '' |
| if timeout is not None: |
| r, w, x = select.select([self.sock], [], [], timeout) |
| if timeout is None or r: |
| return self.sock.recv(self.BUF_SIZE) |
| else: |
| return None |
| |
| def _read_from_buffer(self): |
| buf = self._buffer |
| i = buf.find(':') |
| if i == -1: |
| return None |
| buflen = int(buf[:i]) |
| enclen = i + 1 + buflen |
| if len(buf) >= enclen: |
| s = buf[i+1:enclen] |
| self._buffer = buf[enclen:] |
| return s |
| else: |
| self._buffer = buf |
| return None |
| |
| # helpers for registerHandler method below |
| |
| def get_methods(obj): |
| methods = [] |
| for name in dir(obj): |
| attr = getattr(obj, name) |
| if callable(attr): |
| methods.append(name) |
| if type(obj) == types.InstanceType: |
| methods = methods + get_methods(obj.__class__) |
| if type(obj) == types.ClassType: |
| for super in obj.__bases__: |
| methods = methods + get_methods(super) |
| return methods |
| |
| class CommandProtocol: |
| def __init__(self, sockp): |
| self.sockp = sockp |
| self.seqno = 0 |
| self.handlers = {} |
| |
| def close(self): |
| self.sockp.close() |
| self.handlers.clear() |
| |
| def registerHandler(self, handler): |
| """A Handler is an object with handle_XXX methods""" |
| for methname in get_methods(handler): |
| if methname[:7] == "handle_": |
| name = methname[7:] |
| self.handlers[name] = getattr(handler, methname) |
| |
| def send(self, cmd, arg='', seqno=None): |
| if arg: |
| msg = "%s %s" % (cmd, arg) |
| else: |
| msg = cmd |
| if seqno is None: |
| seqno = self.get_seqno() |
| msgbuf = self.encode_seqno(seqno) + msg |
| self.sockp.send(msgbuf) |
| if cmd == "reply": |
| return |
| reply = self.sockp.receive(timeout=None) |
| r_cmd, r_arg, r_seqno = self._decode_msg(reply) |
| assert r_seqno == seqno and r_cmd == "reply", "bad reply" |
| return r_arg |
| |
| def _decode_msg(self, msg): |
| seqno = self.decode_seqno(msg[:self.SEQNO_ENC_LEN]) |
| msg = msg[self.SEQNO_ENC_LEN:] |
| parts = msg.split(" ", 2) |
| if len(parts) == 1: |
| cmd = msg |
| arg = '' |
| else: |
| cmd = parts[0] |
| arg = parts[1] |
| return cmd, arg, seqno |
| |
| def dispatch(self): |
| msg = self.sockp.receive() |
| if msg is None: |
| return |
| cmd, arg, seqno = self._decode_msg(msg) |
| self._current_reply = seqno |
| h = self.handlers.get(cmd, self.default_handler) |
| try: |
| r = h(arg) |
| except TypeError, msg: |
| raise TypeError, "handle_%s: %s" % (cmd, msg) |
| if self._current_reply is None: |
| if r is not None: |
| sys.stderr.write("ignoring %s return value type %s\n" % \ |
| (cmd, type(r).__name__)) |
| return |
| if r is None: |
| r = '' |
| if type(r) != types.StringType: |
| raise ValueError, "invalid return type for %s" % cmd |
| self.send("reply", r, seqno=seqno) |
| |
| def reply(self, arg=''): |
| """Send a reply immediately |
| |
| otherwise reply will be sent when handler returns |
| """ |
| self.send("reply", arg, self._current_reply) |
| self._current_reply = None |
| |
| def default_handler(self, arg): |
| sys.stderr.write("WARNING: unhandled message %s\n" % arg) |
| return '' |
| |
| SEQNO_ENC_LEN = 4 |
| |
| def get_seqno(self): |
| seqno = self.seqno |
| self.seqno = seqno + 1 |
| return seqno |
| |
| def encode_seqno(self, seqno): |
| return struct.pack("I", seqno) |
| |
| def decode_seqno(self, buf): |
| return struct.unpack("I", buf)[0] |
| |
| |
| class StdioRedirector: |
| """Redirect sys.std{in,out,err} to a set of file-like objects""" |
| |
| def __init__(self, stdin, stdout, stderr): |
| self.stdin = stdin |
| self.stdout = stdout |
| self.stderr = stderr |
| |
| def redirect(self): |
| self.save() |
| sys.stdin = self.stdin |
| sys.stdout = self.stdout |
| sys.stderr = self.stderr |
| |
| def save(self): |
| self._stdin = sys.stdin |
| self._stdout = sys.stdout |
| self._stderr = sys.stderr |
| |
| def restore(self): |
| sys.stdin = self._stdin |
| sys.stdout = self._stdout |
| sys.stderr = self._stderr |
| |
| class IOWrapper: |
| """Send output from a file-like object across a SocketProtocol |
| |
| XXX Should this be more tightly integrated with the CommandProtocol? |
| """ |
| |
| def __init__(self, name, cmdp): |
| self.name = name |
| self.cmdp = cmdp |
| self.buffer = [] |
| |
| class InputWrapper(IOWrapper): |
| def write(self, buf): |
| # XXX what should this do on Windows? |
| raise IOError, (9, '[Errno 9] Bad file descriptor') |
| |
| def read(self, arg=None): |
| if arg is not None: |
| if arg <= 0: |
| return '' |
| else: |
| arg = 0 |
| return self.cmdp.send(self.name, "read,%s" % arg) |
| |
| def readline(self): |
| return self.cmdp.send(self.name, "readline") |
| |
| class OutputWrapper(IOWrapper): |
| def write(self, buf): |
| self.cmdp.send(self.name, buf) |
| |
| def read(self, arg=None): |
| return '' |
| |
| class RemoteInterp: |
| def __init__(self, sock): |
| self._sock = SocketProtocol(sock) |
| self._cmd = CommandProtocol(self._sock) |
| self._cmd.registerHandler(self) |
| |
| def run(self): |
| try: |
| while 1: |
| self._cmd.dispatch() |
| except EOFError: |
| pass |
| |
| def handle_execfile(self, arg): |
| self._cmd.reply() |
| io = StdioRedirector(InputWrapper("stdin", self._cmd), |
| OutputWrapper("stdout", self._cmd), |
| OutputWrapper("stderr", self._cmd)) |
| io.redirect() |
| execfile(arg, {'__name__':'__main__'}) |
| io.restore() |
| self._cmd.send("terminated") |
| |
| def handle_quit(self, arg): |
| self._cmd.reply() |
| self._cmd.close() |
| |
| def startRemoteInterp(id): |
| import os |
| # UNIX domain sockets are simpler for starters |
| sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| sock.bind("/var/tmp/ri.%s" % id) |
| try: |
| sock.listen(1) |
| cli, addr = sock.accept() |
| rinterp = RemoteInterp(cli) |
| rinterp.run() |
| finally: |
| os.unlink("/var/tmp/ri.%s" % id) |
| |
| class RIClient: |
| """Client of the remote interpreter""" |
| def __init__(self, sock): |
| self._sock = SocketProtocol(sock) |
| self._cmd = CommandProtocol(self._sock) |
| self._cmd.registerHandler(self) |
| |
| def execfile(self, file): |
| self._cmd.send("execfile", file) |
| |
| def run(self): |
| try: |
| while 1: |
| self._cmd.dispatch() |
| except EOFError: |
| pass |
| |
| def handle_stdout(self, buf): |
| sys.stdout.write(buf) |
| ## sys.stdout.flush() |
| |
| def handle_stderr(self, buf): |
| sys.stderr.write(buf) |
| |
| def handle_stdin(self, arg): |
| if arg == "readline": |
| return sys.stdin.readline() |
| i = arg.find(",") + 1 |
| bytes = int(arg[i:]) |
| if bytes == 0: |
| return sys.stdin.read() |
| else: |
| return sys.stdin.read(bytes) |
| |
| def handle_terminated(self, arg): |
| self._cmd.reply() |
| self._cmd.send("quit") |
| self._cmd.close() |
| |
| def riExec(id, file): |
| sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| sock.connect("/var/tmp/ri.%s" % id) |
| cli = RIClient(sock) |
| cli.execfile(file) |
| cli.run() |
| |
| if __name__ == "__main__": |
| import sys |
| import getopt |
| |
| SERVER = 1 |
| opts, args = getopt.getopt(sys.argv[1:], 'cv') |
| for o, v in opts: |
| if o == '-c': |
| SERVER = 0 |
| elif o == '-v': |
| VERBOSE = sys.stderr |
| id = args[0] |
| |
| if SERVER: |
| startRemoteInterp(id) |
| else: |
| file = args[1] |
| riExec(id, file) |