blob: f3f638253c5c21fc8cf821c569268ebcce81d9e9 [file] [log] [blame]
David Scherer7aced172000-08-15 01:13:23 +00001"""protocol (David Scherer <dscherer@cmu.edu>)
2
3 This module implements a simple RPC or "distributed object" protocol.
4 I am probably the 100,000th person to write this in Python, but, hey,
5 it was fun.
6
7 Contents:
8
9 connectionLost is an exception that will be thrown by functions in
10 the protocol module or calls to remote methods that fail because
11 the remote program has closed the socket or because no connection
12 could be established in the first place.
13
14 Server( port=None, connection_hook=None ) creates a server on a
15 well-known port, to which clients can connect. When a client
16 connects, a Connection is created for it. If connection_hook
17 is defined, then connection_hook( socket.getpeername() ) is called
18 before a Connection is created, and if it returns false then the
19 connection is refused. connection_hook must be prepared to be
20 called from any thread.
21
22 Client( ip='127.0.0.1', port=None ) returns a Connection to a Server
23 object at a well-known address and port.
24
25 Connection( socket ) creates an RPC connection on an arbitrary socket,
26 which must already be connected to another program. You do not
27 need to use this directly if you are using Client() or Server().
28
29 publish( name, connect_function ) provides an object with the
30 specified name to some or all Connections. When another program
31 calls Connection.getobject() with the specified name, the
32 specified connect_function is called with the arguments
33
34 connect_function( conn, addr )
35
36 where conn is the Connection object to the requesting client and
37 addr is the address returned by socket.getpeername(). If that
38 function returns an object, that object becomes accessible to
39 the caller. If it returns None, the caller's request fails.
40
41 Connection objects:
42
43 .close() refuses additional RPC messages from the peer, and notifies
44 the peer that the connection has been closed. All pending remote
45 method calls in either program will fail with a connectionLost
46 exception. Further remote method calls on this connection will
47 also result in errors.
48
49 .getobject(name) returns a proxy for the remote object with the
50 specified name, if it exists and the peer permits us access.
51 Otherwise, it returns None. It may throw a connectionLost
52 exception. The returned proxy supports basic attribute access
53 and method calls, and its methods have an extra attribute,
54 .void, which is a function that has the same effect but always
55 returns None. This last capability is provided as a performance
56 hack: object.method.void(params) can return without waiting for
57 the remote process to respond, but object.method(params) needs
58 to wait for a return value or exception.
59
60 .rpc_loop(block=0) processes *incoming* messages for this connection.
61 If block=1, it continues processing until an exception or return
62 value is received, which is normally forever. Otherwise it
63 returns when all currently pending messages have been delivered.
64 It may throw a connectionLost exception.
65
66 .set_close_hook(f) specifies a function to be called when the remote
67 object closes the connection during a call to rpc_loop(). This
68 is a good way for servers to be notified when clients disconnect.
69
70 .set_shutdown_hook(f) specifies a function called *immediately* when
71 the receive loop detects that the connection has been lost. The
72 provided function must be prepared to run in any thread.
73
74 Server objects:
75
76 .rpc_loop() processes incoming messages on all connections, and
77 returns when all pending messages have been processed. It will
78 *not* throw connectionLost exceptions; the
79 Connection.set_close_hook() mechanism is much better for servers.
80"""
81
82import sys, os, string, types
83import socket
84from threading import Thread
85from Queue import Queue, Empty
86from cPickle import Pickler, Unpickler, PicklingError
87
88class connectionLost:
89 def __init__(self, what=""): self.what = what
90 def __repr__(self): return self.what
91 def __str__(self): return self.what
92
93def getmethods(cls):
94 "Returns a list of the names of the methods of a class."
95 methods = []
96 for b in cls.__bases__:
97 methods = methods + getmethods(b)
98 d = cls.__dict__
99 for k in d.keys():
100 if type(d[k])==types.FunctionType:
101 methods.append(k)
102 return methods
103
104class methodproxy:
105 "Proxy for a method of a remote object."
106 def __init__(self, classp, name):
107 self.classp=classp
108 self.name=name
109 self.client = classp.client
110 def __call__(self, *args, **keywords):
111 return self.client.call( 'm', self.classp.name, self.name, args, keywords )
112
113 def void(self, *args, **keywords):
114 self.client.call_void( 'm', self.classp.name,self.name,args,keywords)
115
116class classproxy:
117 "Proxy for a remote object."
118 def __init__(self, client, name, methods):
119 self.__dict__['client'] = client
120 self.__dict__['name'] = name
121
122 for m in methods:
123 prox = methodproxy( self, m )
124 self.__dict__[m] = prox
125
126 def __getattr__(self, attr):
127 return self.client.call( 'g', self.name, attr )
128
129 def __setattr__(self, attr, value):
130 self.client.call_void( 's', self.name, attr, value )
131
132local_connect = {}
133def publish(name, connect_function):
134 local_connect[name]=connect_function
135
136class socketFile:
137 "File emulator based on a socket. Provides only blocking semantics for now."
138
139 def __init__(self, socket):
140 self.socket = socket
141 self.buffer = ''
142
143 def _recv(self,bytes):
144 try:
145 r=self.socket.recv(bytes)
146 except:
147 raise connectionLost()
148 if not r:
149 raise connectionLost()
150 return r
151
152 def write(self, string):
153 try:
154 self.socket.send( string )
155 except:
156 raise connectionLost()
157
158 def read(self,bytes):
159 x = bytes-len(self.buffer)
160 while x>0:
161 self.buffer=self.buffer+self._recv(x)
162 x = bytes-len(self.buffer)
163 s = self.buffer[:bytes]
164 self.buffer=self.buffer[bytes:]
165 return s
166
167 def readline(self):
168 while 1:
169 f = string.find(self.buffer,'\n')
170 if f>=0:
171 s = self.buffer[:f+1]
172 self.buffer=self.buffer[f+1:]
173 return s
174 self.buffer = self.buffer + self._recv(1024)
175
176
177class Connection (Thread):
178 debug = 0
179 def __init__(self, socket):
180 self.local_objects = {}
181 self.socket = socket
182 self.name = socket.getpeername()
183 self.socketfile = socketFile(socket)
184 self.queue = Queue(-1)
185 self.refuse_messages = 0
186 self.cmds = { 'm': self.r_meth,
187 'g': self.r_get,
188 's': self.r_set,
189 'o': self.r_geto,
190 'e': self.r_exc,
191 #'r' handled by rpc_loop
192 }
193
194 Thread.__init__(self)
195 self.setDaemon(1)
196 self.start()
197
198 def getobject(self, name):
199 methods = self.call( 'o', name )
200 if methods is None: return None
201 return classproxy(self, name, methods)
202
203 # close_hook is called from rpc_loop(), like a normal remote method
204 # invocation
205 def set_close_hook(self,hook): self.close_hook = hook
206
207 # shutdown_hook is called directly from the run() thread, and needs
208 # to be "thread safe"
209 def set_shutdown_hook(self,hook): self.shutdown_hook = hook
210
211 close_hook = None
212 shutdown_hook = None
213
214 def close(self):
215 self._shutdown()
216 self.refuse_messages = 1
217
218 def call(self, c, *args):
219 self.send( (c, args, 1 ) )
220 return self.rpc_loop( block = 1 )
221
222 def call_void(self, c, *args):
223 try:
224 self.send( (c, args, 0 ) )
225 except:
226 pass
227
228 # the following methods handle individual RPC calls:
229
230 def r_geto(self, obj):
231 c = local_connect.get(obj)
232 if not c: return None
233 o = c(self, self.name)
234 if not o: return None
235 self.local_objects[obj] = o
236 return getmethods(o.__class__)
237
238 def r_meth(self, obj, name, args, keywords):
239 return apply( getattr(self.local_objects[obj],name), args, keywords)
240
241 def r_get(self, obj, name):
242 return getattr(self.local_objects[obj],name)
243
244 def r_set(self, obj, name, value):
245 setattr(self.local_objects[obj],name,value)
246
247 def r_exc(self, e, v):
248 raise e, v
249
250 def rpc_exec(self, cmd, arg, ret):
251 if self.refuse_messages: return
252 if self.debug: print cmd,arg,ret
253 if ret:
254 try:
255 r=apply(self.cmds.get(cmd), arg)
256 self.send( ('r', r, 0) )
257 except:
258 try:
259 self.send( ('e', sys.exc_info()[:2], 0) )
260 except PicklingError:
261 self.send( ('e', (TypeError, 'Unpicklable exception.'), 0 ) )
262 else:
263 # we cannot report exceptions to the caller, so
264 # we report them in this process.
265 r=apply(self.cmds.get(cmd), arg)
266
267 # the following methods implement the RPC and message loops:
268
269 def rpc_loop(self, block=0):
270 if self.refuse_messages: raise connectionLost('(already closed)')
271 try:
272 while 1:
273 try:
274 cmd, arg, ret = self.queue.get( block )
275 except Empty:
276 return None
277 if cmd=='r': return arg
278 self.rpc_exec(cmd,arg,ret)
279 except connectionLost:
280 if self.close_hook:
281 self.close_hook()
282 self.close_hook = None
283 raise
284
285 def run(self):
286 try:
287 while 1:
288 data = self.recv()
289 self.queue.put( data )
290 except:
291 self.queue.put( ('e', sys.exc_info()[:2], 0) )
292
293 # The following send raw pickled data to the peer
294
295 def send(self, data):
296 try:
297 Pickler(self.socketfile,1).dump( data )
298 except connectionLost:
299 self._shutdown()
300 if self.shutdown_hook: self.shutdown_hook()
301 raise
302
303 def recv(self):
304 try:
305 return Unpickler(self.socketfile).load()
306 except connectionLost:
307 self._shutdown()
308 if self.shutdown_hook: self.shutdown_hook()
309 raise
310 except:
311 raise
312
313 def _shutdown(self):
314 try:
315 self.socket.shutdown(1)
316 self.socket.close()
317 except:
318 pass
319
320
321class Server (Thread):
322 default_port = 0x1D1E # "IDlE"
323
324 def __init__(self, port=None, connection_hook=None):
325 self.connections = []
326 self.port = port or self.default_port
327 self.connection_hook = connection_hook
328
329 try:
330 self.wellknown = s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Nicholas Riley9a580c42000-09-24 06:29:50 +0000331 s.bind(('', self.port))
David Scherer7aced172000-08-15 01:13:23 +0000332 s.listen(3)
333 except:
334 raise connectionLost
335
336 Thread.__init__(self)
337 self.setDaemon(1)
338 self.start()
339
340 def run(self):
341 s = self.wellknown
342 while 1:
343 conn, addr = s.accept()
344 if self.connection_hook and not self.connection_hook(addr):
345 try:
346 conn.shutdown(1)
347 except:
348 pass
349 continue
350 self.connections.append( Connection(conn) )
351
352 def rpc_loop(self):
353 cns = self.connections[:]
354 for c in cns:
355 try:
356 c.rpc_loop(block = 0)
357 except connectionLost:
358 if c in self.connections:
359 self.connections.remove(c)
360
361def Client(ip='127.0.0.1', port=None):
362 try:
363 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Nicholas Riley9a580c42000-09-24 06:29:50 +0000364 s.connect((ip,port or Server.default_port))
David Scherer7aced172000-08-15 01:13:23 +0000365 except socket.error, what:
366 raise connectionLost(str(what))
367 except:
368 raise connectionLost()
369 return Connection(s)