| """protocol (David Scherer <dscherer@cmu.edu>) |
| |
| This module implements a simple RPC or "distributed object" protocol. |
| I am probably the 100,000th person to write this in Python, but, hey, |
| it was fun. |
| |
| Contents: |
| |
| connectionLost is an exception that will be thrown by functions in |
| the protocol module or calls to remote methods that fail because |
| the remote program has closed the socket or because no connection |
| could be established in the first place. |
| |
| Server( port=None, connection_hook=None ) creates a server on a |
| well-known port, to which clients can connect. When a client |
| connects, a Connection is created for it. If connection_hook |
| is defined, then connection_hook( socket.getpeername() ) is called |
| before a Connection is created, and if it returns false then the |
| connection is refused. connection_hook must be prepared to be |
| called from any thread. |
| |
| Client( ip='127.0.0.1', port=None ) returns a Connection to a Server |
| object at a well-known address and port. |
| |
| Connection( socket ) creates an RPC connection on an arbitrary socket, |
| which must already be connected to another program. You do not |
| need to use this directly if you are using Client() or Server(). |
| |
| publish( name, connect_function ) provides an object with the |
| specified name to some or all Connections. When another program |
| calls Connection.getobject() with the specified name, the |
| specified connect_function is called with the arguments |
| |
| connect_function( conn, addr ) |
| |
| where conn is the Connection object to the requesting client and |
| addr is the address returned by socket.getpeername(). If that |
| function returns an object, that object becomes accessible to |
| the caller. If it returns None, the caller's request fails. |
| |
| Connection objects: |
| |
| .close() refuses additional RPC messages from the peer, and notifies |
| the peer that the connection has been closed. All pending remote |
| method calls in either program will fail with a connectionLost |
| exception. Further remote method calls on this connection will |
| also result in errors. |
| |
| .getobject(name) returns a proxy for the remote object with the |
| specified name, if it exists and the peer permits us access. |
| Otherwise, it returns None. It may throw a connectionLost |
| exception. The returned proxy supports basic attribute access |
| and method calls, and its methods have an extra attribute, |
| .void, which is a function that has the same effect but always |
| returns None. This last capability is provided as a performance |
| hack: object.method.void(params) can return without waiting for |
| the remote process to respond, but object.method(params) needs |
| to wait for a return value or exception. |
| |
| .rpc_loop(block=0) processes *incoming* messages for this connection. |
| If block=1, it continues processing until an exception or return |
| value is received, which is normally forever. Otherwise it |
| returns when all currently pending messages have been delivered. |
| It may throw a connectionLost exception. |
| |
| .set_close_hook(f) specifies a function to be called when the remote |
| object closes the connection during a call to rpc_loop(). This |
| is a good way for servers to be notified when clients disconnect. |
| |
| .set_shutdown_hook(f) specifies a function called *immediately* when |
| the receive loop detects that the connection has been lost. The |
| provided function must be prepared to run in any thread. |
| |
| Server objects: |
| |
| .rpc_loop() processes incoming messages on all connections, and |
| returns when all pending messages have been processed. It will |
| *not* throw connectionLost exceptions; the |
| Connection.set_close_hook() mechanism is much better for servers. |
| """ |
| |
| import sys, os, string, types |
| import socket |
| from threading import Thread |
| from Queue import Queue, Empty |
| from cPickle import Pickler, Unpickler, PicklingError |
| |
| class connectionLost: |
| def __init__(self, what=""): self.what = what |
| def __repr__(self): return self.what |
| def __str__(self): return self.what |
| |
| def getmethods(cls): |
| "Returns a list of the names of the methods of a class." |
| methods = [] |
| for b in cls.__bases__: |
| methods = methods + getmethods(b) |
| d = cls.__dict__ |
| for k in d.keys(): |
| if type(d[k])==types.FunctionType: |
| methods.append(k) |
| return methods |
| |
| class methodproxy: |
| "Proxy for a method of a remote object." |
| def __init__(self, classp, name): |
| self.classp=classp |
| self.name=name |
| self.client = classp.client |
| def __call__(self, *args, **keywords): |
| return self.client.call( 'm', self.classp.name, self.name, args, keywords ) |
| |
| def void(self, *args, **keywords): |
| self.client.call_void( 'm', self.classp.name,self.name,args,keywords) |
| |
| class classproxy: |
| "Proxy for a remote object." |
| def __init__(self, client, name, methods): |
| self.__dict__['client'] = client |
| self.__dict__['name'] = name |
| |
| for m in methods: |
| prox = methodproxy( self, m ) |
| self.__dict__[m] = prox |
| |
| def __getattr__(self, attr): |
| return self.client.call( 'g', self.name, attr ) |
| |
| def __setattr__(self, attr, value): |
| self.client.call_void( 's', self.name, attr, value ) |
| |
| local_connect = {} |
| def publish(name, connect_function): |
| local_connect[name]=connect_function |
| |
| class socketFile: |
| "File emulator based on a socket. Provides only blocking semantics for now." |
| |
| def __init__(self, socket): |
| self.socket = socket |
| self.buffer = '' |
| |
| def _recv(self,bytes): |
| try: |
| r=self.socket.recv(bytes) |
| except: |
| raise connectionLost() |
| if not r: |
| raise connectionLost() |
| return r |
| |
| def write(self, string): |
| try: |
| self.socket.send( string ) |
| except: |
| raise connectionLost() |
| |
| def read(self,bytes): |
| x = bytes-len(self.buffer) |
| while x>0: |
| self.buffer=self.buffer+self._recv(x) |
| x = bytes-len(self.buffer) |
| s = self.buffer[:bytes] |
| self.buffer=self.buffer[bytes:] |
| return s |
| |
| def readline(self): |
| while 1: |
| f = string.find(self.buffer,'\n') |
| if f>=0: |
| s = self.buffer[:f+1] |
| self.buffer=self.buffer[f+1:] |
| return s |
| self.buffer = self.buffer + self._recv(1024) |
| |
| |
| class Connection (Thread): |
| debug = 0 |
| def __init__(self, socket): |
| self.local_objects = {} |
| self.socket = socket |
| self.name = socket.getpeername() |
| self.socketfile = socketFile(socket) |
| self.queue = Queue(-1) |
| self.refuse_messages = 0 |
| self.cmds = { 'm': self.r_meth, |
| 'g': self.r_get, |
| 's': self.r_set, |
| 'o': self.r_geto, |
| 'e': self.r_exc, |
| #'r' handled by rpc_loop |
| } |
| |
| Thread.__init__(self) |
| self.setDaemon(1) |
| self.start() |
| |
| def getobject(self, name): |
| methods = self.call( 'o', name ) |
| if methods is None: return None |
| return classproxy(self, name, methods) |
| |
| # close_hook is called from rpc_loop(), like a normal remote method |
| # invocation |
| def set_close_hook(self,hook): self.close_hook = hook |
| |
| # shutdown_hook is called directly from the run() thread, and needs |
| # to be "thread safe" |
| def set_shutdown_hook(self,hook): self.shutdown_hook = hook |
| |
| close_hook = None |
| shutdown_hook = None |
| |
| def close(self): |
| self._shutdown() |
| self.refuse_messages = 1 |
| |
| def call(self, c, *args): |
| self.send( (c, args, 1 ) ) |
| return self.rpc_loop( block = 1 ) |
| |
| def call_void(self, c, *args): |
| try: |
| self.send( (c, args, 0 ) ) |
| except: |
| pass |
| |
| # the following methods handle individual RPC calls: |
| |
| def r_geto(self, obj): |
| c = local_connect.get(obj) |
| if not c: return None |
| o = c(self, self.name) |
| if not o: return None |
| self.local_objects[obj] = o |
| return getmethods(o.__class__) |
| |
| def r_meth(self, obj, name, args, keywords): |
| return apply( getattr(self.local_objects[obj],name), args, keywords) |
| |
| def r_get(self, obj, name): |
| return getattr(self.local_objects[obj],name) |
| |
| def r_set(self, obj, name, value): |
| setattr(self.local_objects[obj],name,value) |
| |
| def r_exc(self, e, v): |
| raise e, v |
| |
| def rpc_exec(self, cmd, arg, ret): |
| if self.refuse_messages: return |
| if self.debug: print cmd,arg,ret |
| if ret: |
| try: |
| r=apply(self.cmds.get(cmd), arg) |
| self.send( ('r', r, 0) ) |
| except: |
| try: |
| self.send( ('e', sys.exc_info()[:2], 0) ) |
| except PicklingError: |
| self.send( ('e', (TypeError, 'Unpicklable exception.'), 0 ) ) |
| else: |
| # we cannot report exceptions to the caller, so |
| # we report them in this process. |
| r=apply(self.cmds.get(cmd), arg) |
| |
| # the following methods implement the RPC and message loops: |
| |
| def rpc_loop(self, block=0): |
| if self.refuse_messages: raise connectionLost('(already closed)') |
| try: |
| while 1: |
| try: |
| cmd, arg, ret = self.queue.get( block ) |
| except Empty: |
| return None |
| if cmd=='r': return arg |
| self.rpc_exec(cmd,arg,ret) |
| except connectionLost: |
| if self.close_hook: |
| self.close_hook() |
| self.close_hook = None |
| raise |
| |
| def run(self): |
| try: |
| while 1: |
| data = self.recv() |
| self.queue.put( data ) |
| except: |
| self.queue.put( ('e', sys.exc_info()[:2], 0) ) |
| |
| # The following send raw pickled data to the peer |
| |
| def send(self, data): |
| try: |
| Pickler(self.socketfile,1).dump( data ) |
| except connectionLost: |
| self._shutdown() |
| if self.shutdown_hook: self.shutdown_hook() |
| raise |
| |
| def recv(self): |
| try: |
| return Unpickler(self.socketfile).load() |
| except connectionLost: |
| self._shutdown() |
| if self.shutdown_hook: self.shutdown_hook() |
| raise |
| except: |
| raise |
| |
| def _shutdown(self): |
| try: |
| self.socket.shutdown(1) |
| self.socket.close() |
| except: |
| pass |
| |
| |
| class Server (Thread): |
| default_port = 0x1D1E # "IDlE" |
| |
| def __init__(self, port=None, connection_hook=None): |
| self.connections = [] |
| self.port = port or self.default_port |
| self.connection_hook = connection_hook |
| |
| try: |
| self.wellknown = s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| s.bind(('', self.port)) |
| s.listen(3) |
| except: |
| raise connectionLost |
| |
| Thread.__init__(self) |
| self.setDaemon(1) |
| self.start() |
| |
| def run(self): |
| s = self.wellknown |
| while 1: |
| conn, addr = s.accept() |
| if self.connection_hook and not self.connection_hook(addr): |
| try: |
| conn.shutdown(1) |
| except: |
| pass |
| continue |
| self.connections.append( Connection(conn) ) |
| |
| def rpc_loop(self): |
| cns = self.connections[:] |
| for c in cns: |
| try: |
| c.rpc_loop(block = 0) |
| except connectionLost: |
| if c in self.connections: |
| self.connections.remove(c) |
| |
| def Client(ip='127.0.0.1', port=None): |
| try: |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| s.connect((ip,port or Server.default_port)) |
| except socket.error, what: |
| raise connectionLost(str(what)) |
| except: |
| raise connectionLost() |
| return Connection(s) |