blob: 3f1a7462d4ea081954ba03de35eb1cb5225ea26d [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Chui Tey5d2af632002-05-26 13:36:41 +000031import socket
32import select
33import SocketServer
34import struct
35import cPickle as pickle
36import threading
37import traceback
38import copy_reg
39import types
40import marshal
41
42def unpickle_code(ms):
43 co = marshal.loads(ms)
44 assert isinstance(co, types.CodeType)
45 return co
46
47def pickle_code(co):
48 assert isinstance(co, types.CodeType)
49 ms = marshal.dumps(co)
50 return unpickle_code, (ms,)
51
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000052# XXX KBK 24Aug02 function pickling capability not used in Idle
53# def unpickle_function(ms):
54# return ms
Chui Tey5d2af632002-05-26 13:36:41 +000055
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000056# def pickle_function(fn):
57# assert isinstance(fn, type.FunctionType)
58# return `fn`
Chui Tey5d2af632002-05-26 13:36:41 +000059
60copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000061# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
Chui Tey5d2af632002-05-26 13:36:41 +000062
63BUFSIZE = 8*1024
64
65class RPCServer(SocketServer.TCPServer):
66
67 def __init__(self, addr, handlerclass=None):
68 if handlerclass is None:
69 handlerclass = RPCHandler
Chui Tey5d2af632002-05-26 13:36:41 +000070 SocketServer.TCPServer.__init__(self, addr, handlerclass)
71
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000072 def server_bind(self):
73 "Override TCPServer method, no bind() phase for connecting entity"
74 pass
Chui Tey5d2af632002-05-26 13:36:41 +000075
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000076 def server_activate(self):
77 """Override TCPServer method, connect() instead of listen()
78
79 Due to the reversed connection, self.server_address is actually the
80 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000081
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000082 """
83 self.socket.connect(self.server_address)
84
85 def get_request(self):
86 "Override TCPServer method, return already connected socket"
87 return self.socket, self.server_address
Chui Tey5d2af632002-05-26 13:36:41 +000088
89objecttable = {}
90
91class SocketIO:
92
Chui Tey5d2af632002-05-26 13:36:41 +000093 def __init__(self, sock, objtable=None, debugging=None):
94 self.mainthread = threading.currentThread()
95 if debugging is not None:
96 self.debugging = debugging
97 self.sock = sock
98 if objtable is None:
99 objtable = objecttable
100 self.objtable = objtable
101 self.statelock = threading.Lock()
102 self.responses = {}
103 self.cvars = {}
104
105 def close(self):
106 sock = self.sock
107 self.sock = None
108 if sock is not None:
109 sock.close()
110
111 def debug(self, *args):
112 if not self.debugging:
113 return
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000114 s = self.location + " " + str(threading.currentThread().getName())
Chui Tey5d2af632002-05-26 13:36:41 +0000115 for a in args:
116 s = s + " " + str(a)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000117 print>>sys.__stderr__, s
Chui Tey5d2af632002-05-26 13:36:41 +0000118
119 def register(self, oid, object):
120 self.objtable[oid] = object
121
122 def unregister(self, oid):
123 try:
124 del self.objtable[oid]
125 except KeyError:
126 pass
127
128 def localcall(self, request):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000129 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000130 try:
131 how, (oid, methodname, args, kwargs) = request
132 except TypeError:
133 return ("ERROR", "Bad request format")
134 assert how == "call"
135 if not self.objtable.has_key(oid):
136 return ("ERROR", "Unknown object id: %s" % `oid`)
137 obj = self.objtable[oid]
138 if methodname == "__methods__":
139 methods = {}
140 _getmethods(obj, methods)
141 return ("OK", methods)
142 if methodname == "__attributes__":
143 attributes = {}
144 _getattributes(obj, attributes)
145 return ("OK", attributes)
146 if not hasattr(obj, methodname):
147 return ("ERROR", "Unsupported method name: %s" % `methodname`)
148 method = getattr(obj, methodname)
149 try:
150 ret = method(*args, **kwargs)
151 if isinstance(ret, RemoteObject):
152 ret = remoteref(ret)
153 return ("OK", ret)
154 except:
155 ##traceback.print_exc(file=sys.__stderr__)
156 typ, val, tb = info = sys.exc_info()
157 sys.last_type, sys.last_value, sys.last_traceback = info
158 if isinstance(typ, type(Exception)):
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000159 # Class exception
Chui Tey5d2af632002-05-26 13:36:41 +0000160 mod = typ.__module__
161 name = typ.__name__
162 if issubclass(typ, Exception):
163 args = val.args
164 else:
165 args = (str(val),)
166 else:
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000167 # User string exception
Chui Tey5d2af632002-05-26 13:36:41 +0000168 mod = None
169 name = typ
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000170 if val is None: val = ''
171 args = str(val)
Chui Tey5d2af632002-05-26 13:36:41 +0000172 tb = traceback.extract_tb(tb)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000173 self.debug("localcall:EXCEPTION: ", mod, name, args, tb)
Chui Tey5d2af632002-05-26 13:36:41 +0000174 return ("EXCEPTION", (mod, name, args, tb))
175
176 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000177 self.debug("remotecall:")
Chui Tey5d2af632002-05-26 13:36:41 +0000178 seq = self.asynccall(oid, methodname, args, kwargs)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000179 return self.asyncreturn(seq)
Chui Tey5d2af632002-05-26 13:36:41 +0000180
181 def asynccall(self, oid, methodname, args, kwargs):
182 request = ("call", (oid, methodname, args, kwargs))
183 seq = self.putrequest(request)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000184 self.debug(("asyncall:%d:" % seq), oid, methodname, args, kwargs)
Chui Tey5d2af632002-05-26 13:36:41 +0000185 return seq
186
187 def asyncreturn(self, seq):
188 response = self.getresponse(seq)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000189 self.debug(("asyncreturn:%d:" % seq), response)
Chui Tey5d2af632002-05-26 13:36:41 +0000190 return self.decoderesponse(response)
191
192 def decoderesponse(self, response):
193 how, what = response
194 if how == "OK":
195 return what
196 if how == "EXCEPTION":
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000197 self.debug("decoderesponse: Internal EXCEPTION:", what)
Chui Tey5d2af632002-05-26 13:36:41 +0000198 mod, name, args, tb = what
199 self.traceback = tb
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000200 if mod: # not string exception
Chui Tey5d2af632002-05-26 13:36:41 +0000201 try:
202 __import__(mod)
203 module = sys.modules[mod]
204 except ImportError:
205 pass
206 else:
207 try:
208 cls = getattr(module, name)
209 except AttributeError:
210 pass
211 else:
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000212 # instantiate a built-in exception object and raise it
Chui Tey5d2af632002-05-26 13:36:41 +0000213 raise getattr(__import__(mod), name)(*args)
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000214 name = mod + "." + name
215 # do the best we can:
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000216 raise name, args
Chui Tey5d2af632002-05-26 13:36:41 +0000217 if how == "ERROR":
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000218 self.debug("decoderesponse: Internal ERROR:", what)
Chui Tey5d2af632002-05-26 13:36:41 +0000219 raise RuntimeError, what
220 raise SystemError, (how, what)
221
222 def mainloop(self):
223 try:
224 self.getresponse(None)
225 except EOFError:
226 pass
227
228 def getresponse(self, myseq):
229 response = self._getresponse(myseq)
230 if response is not None:
231 how, what = response
232 if how == "OK":
233 response = how, self._proxify(what)
234 return response
235
236 def _proxify(self, obj):
237 if isinstance(obj, RemoteProxy):
238 return RPCProxy(self, obj.oid)
239 if isinstance(obj, types.ListType):
240 return map(self._proxify, obj)
241 # XXX Check for other types -- not currently needed
242 return obj
243
244 def _getresponse(self, myseq):
245 if threading.currentThread() is self.mainthread:
246 # Main thread: does all reading of requests and responses
247 while 1:
248 response = self.pollresponse(myseq, None)
249 if response is not None:
250 return response
251 else:
252 # Auxiliary thread: wait for notification from main thread
253 cvar = threading.Condition(self.statelock)
254 self.statelock.acquire()
255 self.cvars[myseq] = cvar
256 while not self.responses.has_key(myseq):
257 cvar.wait()
258 response = self.responses[myseq]
259 del self.responses[myseq]
260 del self.cvars[myseq]
261 self.statelock.release()
262 return response
263
264 def putrequest(self, request):
265 seq = self.newseq()
266 self.putmessage((seq, request))
267 return seq
268
269 nextseq = 0
270
271 def newseq(self):
272 self.nextseq = seq = self.nextseq + 2
273 return seq
274
275 def putmessage(self, message):
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000276 ##self.debug("putmessage: ", message)
Chui Tey5d2af632002-05-26 13:36:41 +0000277 try:
278 s = pickle.dumps(message)
279 except:
280 print >>sys.__stderr__, "Cannot pickle:", `message`
281 raise
282 s = struct.pack("<i", len(s)) + s
283 while len(s) > 0:
284 n = self.sock.send(s)
285 s = s[n:]
286
287 def ioready(self, wait=0.0):
288 r, w, x = select.select([self.sock.fileno()], [], [], wait)
289 return len(r)
290
291 buffer = ""
292 bufneed = 4
293 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
294
295 def pollpacket(self, wait=0.0):
296 self._stage0()
297 if len(self.buffer) < self.bufneed:
298 if not self.ioready(wait):
299 return None
300 try:
301 s = self.sock.recv(BUFSIZE)
302 except socket.error:
303 raise EOFError
304 if len(s) == 0:
305 raise EOFError
306 self.buffer += s
307 self._stage0()
308 return self._stage1()
309
310 def _stage0(self):
311 if self.bufstate == 0 and len(self.buffer) >= 4:
312 s = self.buffer[:4]
313 self.buffer = self.buffer[4:]
314 self.bufneed = struct.unpack("<i", s)[0]
315 self.bufstate = 1
316
317 def _stage1(self):
318 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
319 packet = self.buffer[:self.bufneed]
320 self.buffer = self.buffer[self.bufneed:]
321 self.bufneed = 4
322 self.bufstate = 0
323 return packet
324
325 def pollmessage(self, wait=0.0):
326 packet = self.pollpacket(wait)
327 if packet is None:
328 return None
329 try:
330 message = pickle.loads(packet)
331 except:
332 print >>sys.__stderr__, "-----------------------"
333 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
334 traceback.print_stack(file=sys.__stderr__)
335 print >>sys.__stderr__, "-----------------------"
336 raise
337 return message
338
339 def pollresponse(self, myseq, wait=0.0):
340 # Loop while there's no more buffered input or until specific response
341 while 1:
342 message = self.pollmessage(wait)
343 if message is None:
344 return None
345 wait = 0.0
346 seq, resq = message
347 if resq[0] == "call":
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000348 self.debug("call_localcall:%d:" % seq)
Chui Tey5d2af632002-05-26 13:36:41 +0000349 response = self.localcall(resq)
350 self.putmessage((seq, response))
351 continue
352 elif seq == myseq:
353 return resq
354 else:
355 self.statelock.acquire()
356 self.responses[seq] = resq
357 cv = self.cvars.get(seq)
358 if cv is not None:
359 cv.notify()
360 self.statelock.release()
361 continue
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000362
363#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000364
365class RemoteObject:
366 # Token mix-in class
367 pass
368
369def remoteref(obj):
370 oid = id(obj)
371 objecttable[oid] = obj
372 return RemoteProxy(oid)
373
374class RemoteProxy:
375
376 def __init__(self, oid):
377 self.oid = oid
378
379class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
380
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000381 debugging = False
382 location = "#S" # Server
Chui Tey5d2af632002-05-26 13:36:41 +0000383
384 def __init__(self, sock, addr, svr):
385 svr.current_handler = self ## cgt xxx
386 SocketIO.__init__(self, sock)
387 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
388
Chui Tey5d2af632002-05-26 13:36:41 +0000389 def handle(self):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000390 "handle() method required by SocketServer"
Chui Tey5d2af632002-05-26 13:36:41 +0000391 self.mainloop()
392
393 def get_remote_proxy(self, oid):
394 return RPCProxy(self, oid)
395
396class RPCClient(SocketIO):
397
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000398 debugging = False
399 location = "#C" # Client
400
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000401 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000402
403 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000404 self.listening_sock = socket.socket(family, type)
405 self.listening_sock.setsockopt(socket.SOL_SOCKET,
406 socket.SO_REUSEADDR, 1)
407 self.listening_sock.bind(address)
408 self.listening_sock.listen(1)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000409
410 def accept(self):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000411 working_sock, address = self.listening_sock.accept()
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000412 if address[0] == '127.0.0.1':
413 print>>sys.__stderr__, "Idle accepted connection from ", address
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000414 SocketIO.__init__(self, working_sock)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000415 else:
416 print>>sys.__stderr__, "Invalid host: ", address
417 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000418
419 def get_remote_proxy(self, oid):
420 return RPCProxy(self, oid)
421
422class RPCProxy:
423
424 __methods = None
425 __attributes = None
426
427 def __init__(self, sockio, oid):
428 self.sockio = sockio
429 self.oid = oid
430
431 def __getattr__(self, name):
432 if self.__methods is None:
433 self.__getmethods()
434 if self.__methods.get(name):
435 return MethodProxy(self.sockio, self.oid, name)
436 if self.__attributes is None:
437 self.__getattributes()
438 if not self.__attributes.has_key(name):
439 raise AttributeError, name
440 __getattr__.DebuggerStepThrough=1
441
442 def __getattributes(self):
443 self.__attributes = self.sockio.remotecall(self.oid,
444 "__attributes__", (), {})
445
446 def __getmethods(self):
447 self.__methods = self.sockio.remotecall(self.oid,
448 "__methods__", (), {})
449
450def _getmethods(obj, methods):
451 # Helper to get a list of methods from an object
452 # Adds names to dictionary argument 'methods'
453 for name in dir(obj):
454 attr = getattr(obj, name)
455 if callable(attr):
456 methods[name] = 1
457 if type(obj) == types.InstanceType:
458 _getmethods(obj.__class__, methods)
459 if type(obj) == types.ClassType:
460 for super in obj.__bases__:
461 _getmethods(super, methods)
462
463def _getattributes(obj, attributes):
464 for name in dir(obj):
465 attr = getattr(obj, name)
466 if not callable(attr):
467 attributes[name] = 1
468
469class MethodProxy:
470
471 def __init__(self, sockio, oid, name):
472 self.sockio = sockio
473 self.oid = oid
474 self.name = name
475
476 def __call__(self, *args, **kwargs):
477 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
478 return value
479
480#
481# Self Test
482#
483
484def testServer(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000485 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000486 class RemotePerson:
487 def __init__(self,name):
488 self.name = name
489 def greet(self, name):
490 print "(someone called greet)"
491 print "Hello %s, I am %s." % (name, self.name)
492 print
493 def getName(self):
494 print "(someone called getName)"
495 print
496 return self.name
497 def greet_this_guy(self, name):
498 print "(someone called greet_this_guy)"
499 print "About to greet %s ..." % name
500 remote_guy = self.server.current_handler.get_remote_proxy(name)
501 remote_guy.greet("Thomas Edison")
502 print "Done."
503 print
504
505 person = RemotePerson("Thomas Edison")
506 svr = RPCServer(addr)
507 svr.register('thomas', person)
508 person.server = svr # only required if callbacks are used
509
510 # svr.serve_forever()
511 svr.handle_request() # process once only
512
513def testClient(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000514 "demonstrates RPC Client"
515 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000516 import time
517 clt=RPCClient(addr)
518 thomas = clt.get_remote_proxy("thomas")
519 print "The remote person's name is ..."
520 print thomas.getName()
521 # print clt.remotecall("thomas", "getName", (), {})
522 print
523 time.sleep(1)
524 print "Getting remote thomas to say hi..."
525 thomas.greet("Alexander Bell")
526 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
527 print "Done."
528 print
529 time.sleep(2)
Chui Tey5d2af632002-05-26 13:36:41 +0000530 # demonstrates remote server calling local instance
531 class LocalPerson:
532 def __init__(self,name):
533 self.name = name
534 def greet(self, name):
535 print "You've greeted me!"
536 def getName(self):
537 return self.name
538 person = LocalPerson("Alexander Bell")
539 clt.register("alexander",person)
540 thomas.greet_this_guy("alexander")
541 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
542
543def test():
544 addr=("localhost",8833)
545 if len(sys.argv) == 2:
546 if sys.argv[1]=='-server':
547 testServer(addr)
548 return
549 testClient(addr)
550
551if __name__ == '__main__':
552 test()
553
554