blob: bb132257da08d0362a89a37acb3641af3354a8bc [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Chui Tey5d2af632002-05-26 13:36:41 +000031import socket
32import select
33import SocketServer
34import struct
35import cPickle as pickle
36import threading
37import traceback
38import copy_reg
39import types
40import marshal
41
42def unpickle_code(ms):
43 co = marshal.loads(ms)
44 assert isinstance(co, types.CodeType)
45 return co
46
47def pickle_code(co):
48 assert isinstance(co, types.CodeType)
49 ms = marshal.dumps(co)
50 return unpickle_code, (ms,)
51
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000052# XXX KBK 24Aug02 function pickling capability not used in Idle
53# def unpickle_function(ms):
54# return ms
Chui Tey5d2af632002-05-26 13:36:41 +000055
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000056# def pickle_function(fn):
57# assert isinstance(fn, type.FunctionType)
58# return `fn`
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000059
Chui Tey5d2af632002-05-26 13:36:41 +000060copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000061# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
Chui Tey5d2af632002-05-26 13:36:41 +000062
63BUFSIZE = 8*1024
64
65class RPCServer(SocketServer.TCPServer):
66
67 def __init__(self, addr, handlerclass=None):
68 if handlerclass is None:
69 handlerclass = RPCHandler
Chui Tey5d2af632002-05-26 13:36:41 +000070 SocketServer.TCPServer.__init__(self, addr, handlerclass)
71
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000072 def server_bind(self):
73 "Override TCPServer method, no bind() phase for connecting entity"
74 pass
Chui Tey5d2af632002-05-26 13:36:41 +000075
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000076 def server_activate(self):
77 """Override TCPServer method, connect() instead of listen()
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000078
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000079 Due to the reversed connection, self.server_address is actually the
80 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000081
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000082 """
83 self.socket.connect(self.server_address)
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000084
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000085 def get_request(self):
86 "Override TCPServer method, return already connected socket"
87 return self.socket, self.server_address
Chui Tey5d2af632002-05-26 13:36:41 +000088
89objecttable = {}
90
91class SocketIO:
92
Chui Tey5d2af632002-05-26 13:36:41 +000093 def __init__(self, sock, objtable=None, debugging=None):
94 self.mainthread = threading.currentThread()
95 if debugging is not None:
96 self.debugging = debugging
97 self.sock = sock
98 if objtable is None:
99 objtable = objecttable
100 self.objtable = objtable
101 self.statelock = threading.Lock()
102 self.responses = {}
103 self.cvars = {}
104
105 def close(self):
106 sock = self.sock
107 self.sock = None
108 if sock is not None:
109 sock.close()
110
111 def debug(self, *args):
112 if not self.debugging:
113 return
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000114 s = self.location + " " + str(threading.currentThread().getName())
Chui Tey5d2af632002-05-26 13:36:41 +0000115 for a in args:
116 s = s + " " + str(a)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000117 print>>sys.__stderr__, s
Chui Tey5d2af632002-05-26 13:36:41 +0000118
119 def register(self, oid, object):
120 self.objtable[oid] = object
121
122 def unregister(self, oid):
123 try:
124 del self.objtable[oid]
125 except KeyError:
126 pass
127
128 def localcall(self, request):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000129 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000130 try:
131 how, (oid, methodname, args, kwargs) = request
132 except TypeError:
133 return ("ERROR", "Bad request format")
134 assert how == "call"
135 if not self.objtable.has_key(oid):
136 return ("ERROR", "Unknown object id: %s" % `oid`)
137 obj = self.objtable[oid]
138 if methodname == "__methods__":
139 methods = {}
140 _getmethods(obj, methods)
141 return ("OK", methods)
142 if methodname == "__attributes__":
143 attributes = {}
144 _getattributes(obj, attributes)
145 return ("OK", attributes)
146 if not hasattr(obj, methodname):
147 return ("ERROR", "Unsupported method name: %s" % `methodname`)
148 method = getattr(obj, methodname)
149 try:
150 ret = method(*args, **kwargs)
151 if isinstance(ret, RemoteObject):
152 ret = remoteref(ret)
153 return ("OK", ret)
154 except:
155 ##traceback.print_exc(file=sys.__stderr__)
156 typ, val, tb = info = sys.exc_info()
157 sys.last_type, sys.last_value, sys.last_traceback = info
158 if isinstance(typ, type(Exception)):
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000159 # Class exception
Chui Tey5d2af632002-05-26 13:36:41 +0000160 mod = typ.__module__
161 name = typ.__name__
162 if issubclass(typ, Exception):
163 args = val.args
164 else:
165 args = (str(val),)
166 else:
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000167 # User string exception
Chui Tey5d2af632002-05-26 13:36:41 +0000168 mod = None
169 name = typ
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000170 if val is None: val = ''
171 args = str(val)
Chui Tey5d2af632002-05-26 13:36:41 +0000172 tb = traceback.extract_tb(tb)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000173 self.debug("localcall:EXCEPTION: ", mod, name, args, tb)
Chui Tey5d2af632002-05-26 13:36:41 +0000174 return ("EXCEPTION", (mod, name, args, tb))
175
176 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000177 self.debug("remotecall:")
Chui Tey5d2af632002-05-26 13:36:41 +0000178 seq = self.asynccall(oid, methodname, args, kwargs)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000179 return self.asyncreturn(seq)
Chui Tey5d2af632002-05-26 13:36:41 +0000180
181 def asynccall(self, oid, methodname, args, kwargs):
182 request = ("call", (oid, methodname, args, kwargs))
183 seq = self.putrequest(request)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000184 self.debug(("asyncall:%d:" % seq), oid, methodname, args, kwargs)
Chui Tey5d2af632002-05-26 13:36:41 +0000185 return seq
186
187 def asyncreturn(self, seq):
188 response = self.getresponse(seq)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000189 self.debug(("asyncreturn:%d:" % seq), response)
Chui Tey5d2af632002-05-26 13:36:41 +0000190 return self.decoderesponse(response)
191
192 def decoderesponse(self, response):
193 how, what = response
194 if how == "OK":
195 return what
196 if how == "EXCEPTION":
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000197 self.debug("decoderesponse: Internal EXCEPTION:", what)
Chui Tey5d2af632002-05-26 13:36:41 +0000198 mod, name, args, tb = what
199 self.traceback = tb
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000200 if mod: # not string exception
Chui Tey5d2af632002-05-26 13:36:41 +0000201 try:
202 __import__(mod)
203 module = sys.modules[mod]
204 except ImportError:
205 pass
206 else:
207 try:
208 cls = getattr(module, name)
209 except AttributeError:
210 pass
211 else:
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000212 # instantiate a built-in exception object and raise it
Chui Tey5d2af632002-05-26 13:36:41 +0000213 raise getattr(__import__(mod), name)(*args)
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000214 name = mod + "." + name
215 # do the best we can:
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000216 raise name, args
Chui Tey5d2af632002-05-26 13:36:41 +0000217 if how == "ERROR":
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000218 self.debug("decoderesponse: Internal ERROR:", what)
Chui Tey5d2af632002-05-26 13:36:41 +0000219 raise RuntimeError, what
220 raise SystemError, (how, what)
221
222 def mainloop(self):
223 try:
224 self.getresponse(None)
225 except EOFError:
226 pass
227
228 def getresponse(self, myseq):
229 response = self._getresponse(myseq)
230 if response is not None:
231 how, what = response
232 if how == "OK":
233 response = how, self._proxify(what)
234 return response
235
236 def _proxify(self, obj):
237 if isinstance(obj, RemoteProxy):
238 return RPCProxy(self, obj.oid)
239 if isinstance(obj, types.ListType):
240 return map(self._proxify, obj)
241 # XXX Check for other types -- not currently needed
242 return obj
243
244 def _getresponse(self, myseq):
245 if threading.currentThread() is self.mainthread:
246 # Main thread: does all reading of requests and responses
247 while 1:
248 response = self.pollresponse(myseq, None)
249 if response is not None:
250 return response
251 else:
252 # Auxiliary thread: wait for notification from main thread
253 cvar = threading.Condition(self.statelock)
254 self.statelock.acquire()
255 self.cvars[myseq] = cvar
256 while not self.responses.has_key(myseq):
257 cvar.wait()
258 response = self.responses[myseq]
259 del self.responses[myseq]
260 del self.cvars[myseq]
261 self.statelock.release()
262 return response
263
264 def putrequest(self, request):
265 seq = self.newseq()
266 self.putmessage((seq, request))
267 return seq
268
269 nextseq = 0
270
271 def newseq(self):
272 self.nextseq = seq = self.nextseq + 2
273 return seq
274
275 def putmessage(self, message):
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000276 ##self.debug("putmessage: ", message)
Chui Tey5d2af632002-05-26 13:36:41 +0000277 try:
278 s = pickle.dumps(message)
279 except:
280 print >>sys.__stderr__, "Cannot pickle:", `message`
281 raise
282 s = struct.pack("<i", len(s)) + s
283 while len(s) > 0:
284 n = self.sock.send(s)
285 s = s[n:]
286
287 def ioready(self, wait=0.0):
288 r, w, x = select.select([self.sock.fileno()], [], [], wait)
289 return len(r)
290
291 buffer = ""
292 bufneed = 4
293 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
294
295 def pollpacket(self, wait=0.0):
296 self._stage0()
297 if len(self.buffer) < self.bufneed:
298 if not self.ioready(wait):
299 return None
300 try:
301 s = self.sock.recv(BUFSIZE)
302 except socket.error:
303 raise EOFError
304 if len(s) == 0:
305 raise EOFError
306 self.buffer += s
307 self._stage0()
308 return self._stage1()
309
310 def _stage0(self):
311 if self.bufstate == 0 and len(self.buffer) >= 4:
312 s = self.buffer[:4]
313 self.buffer = self.buffer[4:]
314 self.bufneed = struct.unpack("<i", s)[0]
315 self.bufstate = 1
316
317 def _stage1(self):
318 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
319 packet = self.buffer[:self.bufneed]
320 self.buffer = self.buffer[self.bufneed:]
321 self.bufneed = 4
322 self.bufstate = 0
323 return packet
324
325 def pollmessage(self, wait=0.0):
326 packet = self.pollpacket(wait)
327 if packet is None:
328 return None
329 try:
330 message = pickle.loads(packet)
331 except:
332 print >>sys.__stderr__, "-----------------------"
333 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
334 traceback.print_stack(file=sys.__stderr__)
335 print >>sys.__stderr__, "-----------------------"
336 raise
337 return message
338
339 def pollresponse(self, myseq, wait=0.0):
340 # Loop while there's no more buffered input or until specific response
341 while 1:
342 message = self.pollmessage(wait)
343 if message is None:
344 return None
345 wait = 0.0
346 seq, resq = message
347 if resq[0] == "call":
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000348 self.debug("call_localcall:%d:" % seq)
Chui Tey5d2af632002-05-26 13:36:41 +0000349 response = self.localcall(resq)
350 self.putmessage((seq, response))
351 continue
352 elif seq == myseq:
353 return resq
354 else:
355 self.statelock.acquire()
356 self.responses[seq] = resq
357 cv = self.cvars.get(seq)
358 if cv is not None:
359 cv.notify()
360 self.statelock.release()
361 continue
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000362
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000363#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000364
365class RemoteObject:
366 # Token mix-in class
367 pass
368
369def remoteref(obj):
370 oid = id(obj)
371 objecttable[oid] = obj
372 return RemoteProxy(oid)
373
374class RemoteProxy:
375
376 def __init__(self, oid):
377 self.oid = oid
378
379class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
380
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000381 debugging = False
382 location = "#S" # Server
Chui Tey5d2af632002-05-26 13:36:41 +0000383
384 def __init__(self, sock, addr, svr):
385 svr.current_handler = self ## cgt xxx
386 SocketIO.__init__(self, sock)
387 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
388
Chui Tey5d2af632002-05-26 13:36:41 +0000389 def handle(self):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000390 "handle() method required by SocketServer"
Chui Tey5d2af632002-05-26 13:36:41 +0000391 self.mainloop()
392
393 def get_remote_proxy(self, oid):
394 return RPCProxy(self, oid)
395
396class RPCClient(SocketIO):
397
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000398 debugging = False
399 location = "#C" # Client
400
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000401 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000402
403 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000404 self.listening_sock = socket.socket(family, type)
405 self.listening_sock.setsockopt(socket.SOL_SOCKET,
406 socket.SO_REUSEADDR, 1)
407 self.listening_sock.bind(address)
408 self.listening_sock.listen(1)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000409
410 def accept(self):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000411 working_sock, address = self.listening_sock.accept()
Kurt B. Kaiser74d93c82002-12-23 22:51:03 +0000412 if self.debugging:
413 print>>sys.__stderr__, "** Connection request from ", address
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000414 if address[0] == '127.0.0.1':
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000415 SocketIO.__init__(self, working_sock)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000416 else:
Kurt B. Kaiser74d93c82002-12-23 22:51:03 +0000417 print>>sys.__stderr__, "** Invalid host: ", address
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000418 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000419
420 def get_remote_proxy(self, oid):
421 return RPCProxy(self, oid)
422
423class RPCProxy:
424
425 __methods = None
426 __attributes = None
427
428 def __init__(self, sockio, oid):
429 self.sockio = sockio
430 self.oid = oid
431
432 def __getattr__(self, name):
433 if self.__methods is None:
434 self.__getmethods()
435 if self.__methods.get(name):
436 return MethodProxy(self.sockio, self.oid, name)
437 if self.__attributes is None:
438 self.__getattributes()
439 if not self.__attributes.has_key(name):
440 raise AttributeError, name
441 __getattr__.DebuggerStepThrough=1
442
443 def __getattributes(self):
444 self.__attributes = self.sockio.remotecall(self.oid,
445 "__attributes__", (), {})
446
447 def __getmethods(self):
448 self.__methods = self.sockio.remotecall(self.oid,
449 "__methods__", (), {})
450
451def _getmethods(obj, methods):
452 # Helper to get a list of methods from an object
453 # Adds names to dictionary argument 'methods'
454 for name in dir(obj):
455 attr = getattr(obj, name)
456 if callable(attr):
457 methods[name] = 1
458 if type(obj) == types.InstanceType:
459 _getmethods(obj.__class__, methods)
460 if type(obj) == types.ClassType:
461 for super in obj.__bases__:
462 _getmethods(super, methods)
463
464def _getattributes(obj, attributes):
465 for name in dir(obj):
466 attr = getattr(obj, name)
467 if not callable(attr):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000468 attributes[name] = 1
Chui Tey5d2af632002-05-26 13:36:41 +0000469
470class MethodProxy:
471
472 def __init__(self, sockio, oid, name):
473 self.sockio = sockio
474 self.oid = oid
475 self.name = name
476
477 def __call__(self, *args, **kwargs):
478 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
479 return value
480
481#
482# Self Test
483#
484
485def testServer(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000486 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000487 class RemotePerson:
488 def __init__(self,name):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000489 self.name = name
Chui Tey5d2af632002-05-26 13:36:41 +0000490 def greet(self, name):
491 print "(someone called greet)"
492 print "Hello %s, I am %s." % (name, self.name)
493 print
494 def getName(self):
495 print "(someone called getName)"
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000496 print
Chui Tey5d2af632002-05-26 13:36:41 +0000497 return self.name
498 def greet_this_guy(self, name):
499 print "(someone called greet_this_guy)"
500 print "About to greet %s ..." % name
501 remote_guy = self.server.current_handler.get_remote_proxy(name)
502 remote_guy.greet("Thomas Edison")
503 print "Done."
504 print
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000505
Chui Tey5d2af632002-05-26 13:36:41 +0000506 person = RemotePerson("Thomas Edison")
507 svr = RPCServer(addr)
508 svr.register('thomas', person)
509 person.server = svr # only required if callbacks are used
510
511 # svr.serve_forever()
512 svr.handle_request() # process once only
513
514def testClient(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000515 "demonstrates RPC Client"
516 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000517 import time
518 clt=RPCClient(addr)
519 thomas = clt.get_remote_proxy("thomas")
520 print "The remote person's name is ..."
521 print thomas.getName()
522 # print clt.remotecall("thomas", "getName", (), {})
523 print
524 time.sleep(1)
525 print "Getting remote thomas to say hi..."
526 thomas.greet("Alexander Bell")
527 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
528 print "Done."
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000529 print
Chui Tey5d2af632002-05-26 13:36:41 +0000530 time.sleep(2)
Chui Tey5d2af632002-05-26 13:36:41 +0000531 # demonstrates remote server calling local instance
532 class LocalPerson:
533 def __init__(self,name):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000534 self.name = name
Chui Tey5d2af632002-05-26 13:36:41 +0000535 def greet(self, name):
536 print "You've greeted me!"
537 def getName(self):
538 return self.name
539 person = LocalPerson("Alexander Bell")
540 clt.register("alexander",person)
541 thomas.greet_this_guy("alexander")
542 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
543
544def test():
545 addr=("localhost",8833)
546 if len(sys.argv) == 2:
547 if sys.argv[1]=='-server':
548 testServer(addr)
549 return
550 testClient(addr)
551
552if __name__ == '__main__':
553 test()