blob: eeb1b4efd44b00ad9a54880c6dc7f9995e9e2bc5 [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Chui Tey5d2af632002-05-26 13:36:41 +000031import socket
32import select
33import SocketServer
34import struct
35import cPickle as pickle
36import threading
37import traceback
38import copy_reg
39import types
40import marshal
41
42def unpickle_code(ms):
43 co = marshal.loads(ms)
44 assert isinstance(co, types.CodeType)
45 return co
46
47def pickle_code(co):
48 assert isinstance(co, types.CodeType)
49 ms = marshal.dumps(co)
50 return unpickle_code, (ms,)
51
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000052# XXX KBK 24Aug02 function pickling capability not used in Idle
53# def unpickle_function(ms):
54# return ms
Chui Tey5d2af632002-05-26 13:36:41 +000055
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000056# def pickle_function(fn):
57# assert isinstance(fn, type.FunctionType)
58# return `fn`
Chui Tey5d2af632002-05-26 13:36:41 +000059
60copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000061# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
Chui Tey5d2af632002-05-26 13:36:41 +000062
63BUFSIZE = 8*1024
64
65class RPCServer(SocketServer.TCPServer):
66
67 def __init__(self, addr, handlerclass=None):
68 if handlerclass is None:
69 handlerclass = RPCHandler
Chui Tey5d2af632002-05-26 13:36:41 +000070 SocketServer.TCPServer.__init__(self, addr, handlerclass)
71
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000072 def server_bind(self):
73 "Override TCPServer method, no bind() phase for connecting entity"
74 pass
Chui Tey5d2af632002-05-26 13:36:41 +000075
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000076 def server_activate(self):
77 """Override TCPServer method, connect() instead of listen()
78
79 Due to the reversed connection, self.server_address is actually the
80 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000081
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000082 """
83 self.socket.connect(self.server_address)
84
85 def get_request(self):
86 "Override TCPServer method, return already connected socket"
87 return self.socket, self.server_address
Chui Tey5d2af632002-05-26 13:36:41 +000088
89objecttable = {}
90
91class SocketIO:
92
93 debugging = 0
94
95 def __init__(self, sock, objtable=None, debugging=None):
96 self.mainthread = threading.currentThread()
97 if debugging is not None:
98 self.debugging = debugging
99 self.sock = sock
100 if objtable is None:
101 objtable = objecttable
102 self.objtable = objtable
103 self.statelock = threading.Lock()
104 self.responses = {}
105 self.cvars = {}
106
107 def close(self):
108 sock = self.sock
109 self.sock = None
110 if sock is not None:
111 sock.close()
112
113 def debug(self, *args):
114 if not self.debugging:
115 return
116 s = str(threading.currentThread().getName())
117 for a in args:
118 s = s + " " + str(a)
119 s = s + "\n"
120 sys.__stderr__.write(s)
121
122 def register(self, oid, object):
123 self.objtable[oid] = object
124
125 def unregister(self, oid):
126 try:
127 del self.objtable[oid]
128 except KeyError:
129 pass
130
131 def localcall(self, request):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000132 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000133 try:
134 how, (oid, methodname, args, kwargs) = request
135 except TypeError:
136 return ("ERROR", "Bad request format")
137 assert how == "call"
138 if not self.objtable.has_key(oid):
139 return ("ERROR", "Unknown object id: %s" % `oid`)
140 obj = self.objtable[oid]
141 if methodname == "__methods__":
142 methods = {}
143 _getmethods(obj, methods)
144 return ("OK", methods)
145 if methodname == "__attributes__":
146 attributes = {}
147 _getattributes(obj, attributes)
148 return ("OK", attributes)
149 if not hasattr(obj, methodname):
150 return ("ERROR", "Unsupported method name: %s" % `methodname`)
151 method = getattr(obj, methodname)
152 try:
153 ret = method(*args, **kwargs)
154 if isinstance(ret, RemoteObject):
155 ret = remoteref(ret)
156 return ("OK", ret)
157 except:
158 ##traceback.print_exc(file=sys.__stderr__)
159 typ, val, tb = info = sys.exc_info()
160 sys.last_type, sys.last_value, sys.last_traceback = info
161 if isinstance(typ, type(Exception)):
162 # Class exceptions
163 mod = typ.__module__
164 name = typ.__name__
165 if issubclass(typ, Exception):
166 args = val.args
167 else:
168 args = (str(val),)
169 else:
170 # String exceptions
171 mod = None
172 name = typ
173 args = (str(val),)
174 tb = traceback.extract_tb(tb)
175 return ("EXCEPTION", (mod, name, args, tb))
176
177 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000178 self.debug("remotecall:", oid, methodname, args, kwargs)
Chui Tey5d2af632002-05-26 13:36:41 +0000179 seq = self.asynccall(oid, methodname, args, kwargs)
Kurt B. Kaiser5afa1df2002-10-10 08:25:24 +0000180 ret = self.asyncreturn(seq)
181 self.debug("return:", ret)
182 return ret
Chui Tey5d2af632002-05-26 13:36:41 +0000183
184 def asynccall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser5afa1df2002-10-10 08:25:24 +0000185 self.debug("asyncall:", oid, methodname, args, kwargs)
Chui Tey5d2af632002-05-26 13:36:41 +0000186 request = ("call", (oid, methodname, args, kwargs))
187 seq = self.putrequest(request)
188 return seq
189
190 def asyncreturn(self, seq):
191 response = self.getresponse(seq)
192 return self.decoderesponse(response)
193
194 def decoderesponse(self, response):
195 how, what = response
196 if how == "OK":
197 return what
198 if how == "EXCEPTION":
199 mod, name, args, tb = what
200 self.traceback = tb
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000201 if mod: # not string exception
Chui Tey5d2af632002-05-26 13:36:41 +0000202 try:
203 __import__(mod)
204 module = sys.modules[mod]
205 except ImportError:
206 pass
207 else:
208 try:
209 cls = getattr(module, name)
210 except AttributeError:
211 pass
212 else:
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000213 # instantiate a built-in exception object and raise it
Chui Tey5d2af632002-05-26 13:36:41 +0000214 raise getattr(__import__(mod), name)(*args)
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000215 name = mod + "." + name
216 # do the best we can:
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000217 raise name, args
Chui Tey5d2af632002-05-26 13:36:41 +0000218 if how == "ERROR":
219 raise RuntimeError, what
220 raise SystemError, (how, what)
221
222 def mainloop(self):
223 try:
224 self.getresponse(None)
225 except EOFError:
226 pass
227
228 def getresponse(self, myseq):
229 response = self._getresponse(myseq)
230 if response is not None:
231 how, what = response
232 if how == "OK":
233 response = how, self._proxify(what)
234 return response
235
236 def _proxify(self, obj):
237 if isinstance(obj, RemoteProxy):
238 return RPCProxy(self, obj.oid)
239 if isinstance(obj, types.ListType):
240 return map(self._proxify, obj)
241 # XXX Check for other types -- not currently needed
242 return obj
243
244 def _getresponse(self, myseq):
245 if threading.currentThread() is self.mainthread:
246 # Main thread: does all reading of requests and responses
247 while 1:
248 response = self.pollresponse(myseq, None)
249 if response is not None:
250 return response
251 else:
252 # Auxiliary thread: wait for notification from main thread
253 cvar = threading.Condition(self.statelock)
254 self.statelock.acquire()
255 self.cvars[myseq] = cvar
256 while not self.responses.has_key(myseq):
257 cvar.wait()
258 response = self.responses[myseq]
259 del self.responses[myseq]
260 del self.cvars[myseq]
261 self.statelock.release()
262 return response
263
264 def putrequest(self, request):
265 seq = self.newseq()
266 self.putmessage((seq, request))
267 return seq
268
269 nextseq = 0
270
271 def newseq(self):
272 self.nextseq = seq = self.nextseq + 2
273 return seq
274
275 def putmessage(self, message):
276 try:
277 s = pickle.dumps(message)
278 except:
279 print >>sys.__stderr__, "Cannot pickle:", `message`
280 raise
281 s = struct.pack("<i", len(s)) + s
282 while len(s) > 0:
283 n = self.sock.send(s)
284 s = s[n:]
285
286 def ioready(self, wait=0.0):
287 r, w, x = select.select([self.sock.fileno()], [], [], wait)
288 return len(r)
289
290 buffer = ""
291 bufneed = 4
292 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
293
294 def pollpacket(self, wait=0.0):
295 self._stage0()
296 if len(self.buffer) < self.bufneed:
297 if not self.ioready(wait):
298 return None
299 try:
300 s = self.sock.recv(BUFSIZE)
301 except socket.error:
302 raise EOFError
303 if len(s) == 0:
304 raise EOFError
305 self.buffer += s
306 self._stage0()
307 return self._stage1()
308
309 def _stage0(self):
310 if self.bufstate == 0 and len(self.buffer) >= 4:
311 s = self.buffer[:4]
312 self.buffer = self.buffer[4:]
313 self.bufneed = struct.unpack("<i", s)[0]
314 self.bufstate = 1
315
316 def _stage1(self):
317 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
318 packet = self.buffer[:self.bufneed]
319 self.buffer = self.buffer[self.bufneed:]
320 self.bufneed = 4
321 self.bufstate = 0
322 return packet
323
324 def pollmessage(self, wait=0.0):
325 packet = self.pollpacket(wait)
326 if packet is None:
327 return None
328 try:
329 message = pickle.loads(packet)
330 except:
331 print >>sys.__stderr__, "-----------------------"
332 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
333 traceback.print_stack(file=sys.__stderr__)
334 print >>sys.__stderr__, "-----------------------"
335 raise
336 return message
337
338 def pollresponse(self, myseq, wait=0.0):
339 # Loop while there's no more buffered input or until specific response
340 while 1:
341 message = self.pollmessage(wait)
342 if message is None:
343 return None
344 wait = 0.0
345 seq, resq = message
346 if resq[0] == "call":
347 response = self.localcall(resq)
348 self.putmessage((seq, response))
349 continue
350 elif seq == myseq:
351 return resq
352 else:
353 self.statelock.acquire()
354 self.responses[seq] = resq
355 cv = self.cvars.get(seq)
356 if cv is not None:
357 cv.notify()
358 self.statelock.release()
359 continue
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000360
361#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000362
363class RemoteObject:
364 # Token mix-in class
365 pass
366
367def remoteref(obj):
368 oid = id(obj)
369 objecttable[oid] = obj
370 return RemoteProxy(oid)
371
372class RemoteProxy:
373
374 def __init__(self, oid):
375 self.oid = oid
376
377class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
378
379 debugging = 0
380
381 def __init__(self, sock, addr, svr):
382 svr.current_handler = self ## cgt xxx
383 SocketIO.__init__(self, sock)
384 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
385
Chui Tey5d2af632002-05-26 13:36:41 +0000386 def handle(self):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000387 "handle() method required by SocketServer"
Chui Tey5d2af632002-05-26 13:36:41 +0000388 self.mainloop()
389
390 def get_remote_proxy(self, oid):
391 return RPCProxy(self, oid)
392
393class RPCClient(SocketIO):
394
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000395 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000396
397 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000398 self.listening_sock = socket.socket(family, type)
399 self.listening_sock.setsockopt(socket.SOL_SOCKET,
400 socket.SO_REUSEADDR, 1)
401 self.listening_sock.bind(address)
402 self.listening_sock.listen(1)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000403
404 def accept(self):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000405 working_sock, address = self.listening_sock.accept()
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000406 if address[0] == '127.0.0.1':
407 print>>sys.__stderr__, "Idle accepted connection from ", address
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000408 SocketIO.__init__(self, working_sock)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000409 else:
410 print>>sys.__stderr__, "Invalid host: ", address
411 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000412
413 def get_remote_proxy(self, oid):
414 return RPCProxy(self, oid)
415
416class RPCProxy:
417
418 __methods = None
419 __attributes = None
420
421 def __init__(self, sockio, oid):
422 self.sockio = sockio
423 self.oid = oid
424
425 def __getattr__(self, name):
426 if self.__methods is None:
427 self.__getmethods()
428 if self.__methods.get(name):
429 return MethodProxy(self.sockio, self.oid, name)
430 if self.__attributes is None:
431 self.__getattributes()
432 if not self.__attributes.has_key(name):
433 raise AttributeError, name
434 __getattr__.DebuggerStepThrough=1
435
436 def __getattributes(self):
437 self.__attributes = self.sockio.remotecall(self.oid,
438 "__attributes__", (), {})
439
440 def __getmethods(self):
441 self.__methods = self.sockio.remotecall(self.oid,
442 "__methods__", (), {})
443
444def _getmethods(obj, methods):
445 # Helper to get a list of methods from an object
446 # Adds names to dictionary argument 'methods'
447 for name in dir(obj):
448 attr = getattr(obj, name)
449 if callable(attr):
450 methods[name] = 1
451 if type(obj) == types.InstanceType:
452 _getmethods(obj.__class__, methods)
453 if type(obj) == types.ClassType:
454 for super in obj.__bases__:
455 _getmethods(super, methods)
456
457def _getattributes(obj, attributes):
458 for name in dir(obj):
459 attr = getattr(obj, name)
460 if not callable(attr):
461 attributes[name] = 1
462
463class MethodProxy:
464
465 def __init__(self, sockio, oid, name):
466 self.sockio = sockio
467 self.oid = oid
468 self.name = name
469
470 def __call__(self, *args, **kwargs):
471 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
472 return value
473
474#
475# Self Test
476#
477
478def testServer(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000479 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000480 class RemotePerson:
481 def __init__(self,name):
482 self.name = name
483 def greet(self, name):
484 print "(someone called greet)"
485 print "Hello %s, I am %s." % (name, self.name)
486 print
487 def getName(self):
488 print "(someone called getName)"
489 print
490 return self.name
491 def greet_this_guy(self, name):
492 print "(someone called greet_this_guy)"
493 print "About to greet %s ..." % name
494 remote_guy = self.server.current_handler.get_remote_proxy(name)
495 remote_guy.greet("Thomas Edison")
496 print "Done."
497 print
498
499 person = RemotePerson("Thomas Edison")
500 svr = RPCServer(addr)
501 svr.register('thomas', person)
502 person.server = svr # only required if callbacks are used
503
504 # svr.serve_forever()
505 svr.handle_request() # process once only
506
507def testClient(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000508 "demonstrates RPC Client"
509 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000510 import time
511 clt=RPCClient(addr)
512 thomas = clt.get_remote_proxy("thomas")
513 print "The remote person's name is ..."
514 print thomas.getName()
515 # print clt.remotecall("thomas", "getName", (), {})
516 print
517 time.sleep(1)
518 print "Getting remote thomas to say hi..."
519 thomas.greet("Alexander Bell")
520 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
521 print "Done."
522 print
523 time.sleep(2)
Chui Tey5d2af632002-05-26 13:36:41 +0000524 # demonstrates remote server calling local instance
525 class LocalPerson:
526 def __init__(self,name):
527 self.name = name
528 def greet(self, name):
529 print "You've greeted me!"
530 def getName(self):
531 return self.name
532 person = LocalPerson("Alexander Bell")
533 clt.register("alexander",person)
534 thomas.greet_this_guy("alexander")
535 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
536
537def test():
538 addr=("localhost",8833)
539 if len(sys.argv) == 2:
540 if sys.argv[1]=='-server':
541 testServer(addr)
542 return
543 testClient(addr)
544
545if __name__ == '__main__':
546 test()
547
548