David Scherer | 7aced17 | 2000-08-15 01:13:23 +0000 | [diff] [blame] | 1 | """protocol (David Scherer <dscherer@cmu.edu>) |
| 2 | |
| 3 | This module implements a simple RPC or "distributed object" protocol. |
| 4 | I am probably the 100,000th person to write this in Python, but, hey, |
| 5 | it was fun. |
| 6 | |
| 7 | Contents: |
| 8 | |
| 9 | connectionLost is an exception that will be thrown by functions in |
| 10 | the protocol module or calls to remote methods that fail because |
| 11 | the remote program has closed the socket or because no connection |
| 12 | could be established in the first place. |
| 13 | |
| 14 | Server( port=None, connection_hook=None ) creates a server on a |
| 15 | well-known port, to which clients can connect. When a client |
| 16 | connects, a Connection is created for it. If connection_hook |
| 17 | is defined, then connection_hook( socket.getpeername() ) is called |
| 18 | before a Connection is created, and if it returns false then the |
| 19 | connection is refused. connection_hook must be prepared to be |
| 20 | called from any thread. |
| 21 | |
| 22 | Client( ip='127.0.0.1', port=None ) returns a Connection to a Server |
| 23 | object at a well-known address and port. |
| 24 | |
| 25 | Connection( socket ) creates an RPC connection on an arbitrary socket, |
| 26 | which must already be connected to another program. You do not |
| 27 | need to use this directly if you are using Client() or Server(). |
| 28 | |
| 29 | publish( name, connect_function ) provides an object with the |
| 30 | specified name to some or all Connections. When another program |
| 31 | calls Connection.getobject() with the specified name, the |
| 32 | specified connect_function is called with the arguments |
| 33 | |
| 34 | connect_function( conn, addr ) |
| 35 | |
| 36 | where conn is the Connection object to the requesting client and |
| 37 | addr is the address returned by socket.getpeername(). If that |
| 38 | function returns an object, that object becomes accessible to |
| 39 | the caller. If it returns None, the caller's request fails. |
| 40 | |
| 41 | Connection objects: |
| 42 | |
| 43 | .close() refuses additional RPC messages from the peer, and notifies |
| 44 | the peer that the connection has been closed. All pending remote |
| 45 | method calls in either program will fail with a connectionLost |
| 46 | exception. Further remote method calls on this connection will |
| 47 | also result in errors. |
| 48 | |
| 49 | .getobject(name) returns a proxy for the remote object with the |
| 50 | specified name, if it exists and the peer permits us access. |
| 51 | Otherwise, it returns None. It may throw a connectionLost |
| 52 | exception. The returned proxy supports basic attribute access |
| 53 | and method calls, and its methods have an extra attribute, |
| 54 | .void, which is a function that has the same effect but always |
| 55 | returns None. This last capability is provided as a performance |
| 56 | hack: object.method.void(params) can return without waiting for |
| 57 | the remote process to respond, but object.method(params) needs |
| 58 | to wait for a return value or exception. |
| 59 | |
| 60 | .rpc_loop(block=0) processes *incoming* messages for this connection. |
| 61 | If block=1, it continues processing until an exception or return |
| 62 | value is received, which is normally forever. Otherwise it |
| 63 | returns when all currently pending messages have been delivered. |
| 64 | It may throw a connectionLost exception. |
| 65 | |
| 66 | .set_close_hook(f) specifies a function to be called when the remote |
| 67 | object closes the connection during a call to rpc_loop(). This |
| 68 | is a good way for servers to be notified when clients disconnect. |
| 69 | |
| 70 | .set_shutdown_hook(f) specifies a function called *immediately* when |
| 71 | the receive loop detects that the connection has been lost. The |
| 72 | provided function must be prepared to run in any thread. |
| 73 | |
| 74 | Server objects: |
| 75 | |
| 76 | .rpc_loop() processes incoming messages on all connections, and |
| 77 | returns when all pending messages have been processed. It will |
| 78 | *not* throw connectionLost exceptions; the |
| 79 | Connection.set_close_hook() mechanism is much better for servers. |
| 80 | """ |
| 81 | |
| 82 | import sys, os, string, types |
| 83 | import socket |
| 84 | from threading import Thread |
| 85 | from Queue import Queue, Empty |
| 86 | from cPickle import Pickler, Unpickler, PicklingError |
| 87 | |
| 88 | class connectionLost: |
| 89 | def __init__(self, what=""): self.what = what |
| 90 | def __repr__(self): return self.what |
| 91 | def __str__(self): return self.what |
| 92 | |
| 93 | def getmethods(cls): |
| 94 | "Returns a list of the names of the methods of a class." |
| 95 | methods = [] |
| 96 | for b in cls.__bases__: |
| 97 | methods = methods + getmethods(b) |
| 98 | d = cls.__dict__ |
| 99 | for k in d.keys(): |
| 100 | if type(d[k])==types.FunctionType: |
| 101 | methods.append(k) |
| 102 | return methods |
| 103 | |
| 104 | class methodproxy: |
| 105 | "Proxy for a method of a remote object." |
| 106 | def __init__(self, classp, name): |
| 107 | self.classp=classp |
| 108 | self.name=name |
| 109 | self.client = classp.client |
| 110 | def __call__(self, *args, **keywords): |
| 111 | return self.client.call( 'm', self.classp.name, self.name, args, keywords ) |
| 112 | |
| 113 | def void(self, *args, **keywords): |
| 114 | self.client.call_void( 'm', self.classp.name,self.name,args,keywords) |
| 115 | |
| 116 | class classproxy: |
| 117 | "Proxy for a remote object." |
| 118 | def __init__(self, client, name, methods): |
| 119 | self.__dict__['client'] = client |
| 120 | self.__dict__['name'] = name |
| 121 | |
| 122 | for m in methods: |
| 123 | prox = methodproxy( self, m ) |
| 124 | self.__dict__[m] = prox |
| 125 | |
| 126 | def __getattr__(self, attr): |
| 127 | return self.client.call( 'g', self.name, attr ) |
| 128 | |
| 129 | def __setattr__(self, attr, value): |
| 130 | self.client.call_void( 's', self.name, attr, value ) |
| 131 | |
| 132 | local_connect = {} |
| 133 | def publish(name, connect_function): |
| 134 | local_connect[name]=connect_function |
| 135 | |
| 136 | class socketFile: |
| 137 | "File emulator based on a socket. Provides only blocking semantics for now." |
| 138 | |
| 139 | def __init__(self, socket): |
| 140 | self.socket = socket |
| 141 | self.buffer = '' |
| 142 | |
| 143 | def _recv(self,bytes): |
| 144 | try: |
| 145 | r=self.socket.recv(bytes) |
| 146 | except: |
| 147 | raise connectionLost() |
| 148 | if not r: |
| 149 | raise connectionLost() |
| 150 | return r |
| 151 | |
| 152 | def write(self, string): |
| 153 | try: |
| 154 | self.socket.send( string ) |
| 155 | except: |
| 156 | raise connectionLost() |
| 157 | |
| 158 | def read(self,bytes): |
| 159 | x = bytes-len(self.buffer) |
| 160 | while x>0: |
| 161 | self.buffer=self.buffer+self._recv(x) |
| 162 | x = bytes-len(self.buffer) |
| 163 | s = self.buffer[:bytes] |
| 164 | self.buffer=self.buffer[bytes:] |
| 165 | return s |
| 166 | |
| 167 | def readline(self): |
| 168 | while 1: |
| 169 | f = string.find(self.buffer,'\n') |
| 170 | if f>=0: |
| 171 | s = self.buffer[:f+1] |
| 172 | self.buffer=self.buffer[f+1:] |
| 173 | return s |
| 174 | self.buffer = self.buffer + self._recv(1024) |
| 175 | |
| 176 | |
| 177 | class Connection (Thread): |
| 178 | debug = 0 |
| 179 | def __init__(self, socket): |
| 180 | self.local_objects = {} |
| 181 | self.socket = socket |
| 182 | self.name = socket.getpeername() |
| 183 | self.socketfile = socketFile(socket) |
| 184 | self.queue = Queue(-1) |
| 185 | self.refuse_messages = 0 |
| 186 | self.cmds = { 'm': self.r_meth, |
| 187 | 'g': self.r_get, |
| 188 | 's': self.r_set, |
| 189 | 'o': self.r_geto, |
| 190 | 'e': self.r_exc, |
| 191 | #'r' handled by rpc_loop |
| 192 | } |
| 193 | |
| 194 | Thread.__init__(self) |
| 195 | self.setDaemon(1) |
| 196 | self.start() |
| 197 | |
| 198 | def getobject(self, name): |
| 199 | methods = self.call( 'o', name ) |
| 200 | if methods is None: return None |
| 201 | return classproxy(self, name, methods) |
| 202 | |
| 203 | # close_hook is called from rpc_loop(), like a normal remote method |
| 204 | # invocation |
| 205 | def set_close_hook(self,hook): self.close_hook = hook |
| 206 | |
| 207 | # shutdown_hook is called directly from the run() thread, and needs |
| 208 | # to be "thread safe" |
| 209 | def set_shutdown_hook(self,hook): self.shutdown_hook = hook |
| 210 | |
| 211 | close_hook = None |
| 212 | shutdown_hook = None |
| 213 | |
| 214 | def close(self): |
| 215 | self._shutdown() |
| 216 | self.refuse_messages = 1 |
| 217 | |
| 218 | def call(self, c, *args): |
| 219 | self.send( (c, args, 1 ) ) |
| 220 | return self.rpc_loop( block = 1 ) |
| 221 | |
| 222 | def call_void(self, c, *args): |
| 223 | try: |
| 224 | self.send( (c, args, 0 ) ) |
| 225 | except: |
| 226 | pass |
| 227 | |
| 228 | # the following methods handle individual RPC calls: |
| 229 | |
| 230 | def r_geto(self, obj): |
| 231 | c = local_connect.get(obj) |
| 232 | if not c: return None |
| 233 | o = c(self, self.name) |
| 234 | if not o: return None |
| 235 | self.local_objects[obj] = o |
| 236 | return getmethods(o.__class__) |
| 237 | |
| 238 | def r_meth(self, obj, name, args, keywords): |
| 239 | return apply( getattr(self.local_objects[obj],name), args, keywords) |
| 240 | |
| 241 | def r_get(self, obj, name): |
| 242 | return getattr(self.local_objects[obj],name) |
| 243 | |
| 244 | def r_set(self, obj, name, value): |
| 245 | setattr(self.local_objects[obj],name,value) |
| 246 | |
| 247 | def r_exc(self, e, v): |
| 248 | raise e, v |
| 249 | |
| 250 | def rpc_exec(self, cmd, arg, ret): |
| 251 | if self.refuse_messages: return |
| 252 | if self.debug: print cmd,arg,ret |
| 253 | if ret: |
| 254 | try: |
| 255 | r=apply(self.cmds.get(cmd), arg) |
| 256 | self.send( ('r', r, 0) ) |
| 257 | except: |
| 258 | try: |
| 259 | self.send( ('e', sys.exc_info()[:2], 0) ) |
| 260 | except PicklingError: |
| 261 | self.send( ('e', (TypeError, 'Unpicklable exception.'), 0 ) ) |
| 262 | else: |
| 263 | # we cannot report exceptions to the caller, so |
| 264 | # we report them in this process. |
| 265 | r=apply(self.cmds.get(cmd), arg) |
| 266 | |
| 267 | # the following methods implement the RPC and message loops: |
| 268 | |
| 269 | def rpc_loop(self, block=0): |
| 270 | if self.refuse_messages: raise connectionLost('(already closed)') |
| 271 | try: |
| 272 | while 1: |
| 273 | try: |
| 274 | cmd, arg, ret = self.queue.get( block ) |
| 275 | except Empty: |
| 276 | return None |
| 277 | if cmd=='r': return arg |
| 278 | self.rpc_exec(cmd,arg,ret) |
| 279 | except connectionLost: |
| 280 | if self.close_hook: |
| 281 | self.close_hook() |
| 282 | self.close_hook = None |
| 283 | raise |
| 284 | |
| 285 | def run(self): |
| 286 | try: |
| 287 | while 1: |
| 288 | data = self.recv() |
| 289 | self.queue.put( data ) |
| 290 | except: |
| 291 | self.queue.put( ('e', sys.exc_info()[:2], 0) ) |
| 292 | |
| 293 | # The following send raw pickled data to the peer |
| 294 | |
| 295 | def send(self, data): |
| 296 | try: |
| 297 | Pickler(self.socketfile,1).dump( data ) |
| 298 | except connectionLost: |
| 299 | self._shutdown() |
| 300 | if self.shutdown_hook: self.shutdown_hook() |
| 301 | raise |
| 302 | |
| 303 | def recv(self): |
| 304 | try: |
| 305 | return Unpickler(self.socketfile).load() |
| 306 | except connectionLost: |
| 307 | self._shutdown() |
| 308 | if self.shutdown_hook: self.shutdown_hook() |
| 309 | raise |
| 310 | except: |
| 311 | raise |
| 312 | |
| 313 | def _shutdown(self): |
| 314 | try: |
| 315 | self.socket.shutdown(1) |
| 316 | self.socket.close() |
| 317 | except: |
| 318 | pass |
| 319 | |
| 320 | |
| 321 | class Server (Thread): |
| 322 | default_port = 0x1D1E # "IDlE" |
| 323 | |
| 324 | def __init__(self, port=None, connection_hook=None): |
| 325 | self.connections = [] |
| 326 | self.port = port or self.default_port |
| 327 | self.connection_hook = connection_hook |
| 328 | |
| 329 | try: |
| 330 | self.wellknown = s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
Nicholas Riley | 9a580c4 | 2000-09-24 06:29:50 +0000 | [diff] [blame] | 331 | s.bind(('', self.port)) |
David Scherer | 7aced17 | 2000-08-15 01:13:23 +0000 | [diff] [blame] | 332 | s.listen(3) |
| 333 | except: |
| 334 | raise connectionLost |
| 335 | |
| 336 | Thread.__init__(self) |
| 337 | self.setDaemon(1) |
| 338 | self.start() |
| 339 | |
| 340 | def run(self): |
| 341 | s = self.wellknown |
| 342 | while 1: |
| 343 | conn, addr = s.accept() |
| 344 | if self.connection_hook and not self.connection_hook(addr): |
| 345 | try: |
| 346 | conn.shutdown(1) |
| 347 | except: |
| 348 | pass |
| 349 | continue |
| 350 | self.connections.append( Connection(conn) ) |
| 351 | |
| 352 | def rpc_loop(self): |
| 353 | cns = self.connections[:] |
| 354 | for c in cns: |
| 355 | try: |
| 356 | c.rpc_loop(block = 0) |
| 357 | except connectionLost: |
| 358 | if c in self.connections: |
| 359 | self.connections.remove(c) |
| 360 | |
| 361 | def Client(ip='127.0.0.1', port=None): |
| 362 | try: |
| 363 | s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
Nicholas Riley | 9a580c4 | 2000-09-24 06:29:50 +0000 | [diff] [blame] | 364 | s.connect((ip,port or Server.default_port)) |
David Scherer | 7aced17 | 2000-08-15 01:13:23 +0000 | [diff] [blame] | 365 | except socket.error, what: |
| 366 | raise connectionLost(str(what)) |
| 367 | except: |
| 368 | raise connectionLost() |
| 369 | return Connection(s) |