blob: 4bbf0bb786a7ca4b6147fb9d0515a73f8cd5c591 [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Chui Tey5d2af632002-05-26 13:36:41 +000031import socket
32import select
33import SocketServer
34import struct
35import cPickle as pickle
36import threading
37import traceback
38import copy_reg
39import types
40import marshal
41
42def unpickle_code(ms):
43 co = marshal.loads(ms)
44 assert isinstance(co, types.CodeType)
45 return co
46
47def pickle_code(co):
48 assert isinstance(co, types.CodeType)
49 ms = marshal.dumps(co)
50 return unpickle_code, (ms,)
51
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000052# XXX KBK 24Aug02 function pickling capability not used in Idle
53# def unpickle_function(ms):
54# return ms
Chui Tey5d2af632002-05-26 13:36:41 +000055
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000056# def pickle_function(fn):
57# assert isinstance(fn, type.FunctionType)
58# return `fn`
Chui Tey5d2af632002-05-26 13:36:41 +000059
60copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000061# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
Chui Tey5d2af632002-05-26 13:36:41 +000062
63BUFSIZE = 8*1024
64
65class RPCServer(SocketServer.TCPServer):
66
67 def __init__(self, addr, handlerclass=None):
68 if handlerclass is None:
69 handlerclass = RPCHandler
Chui Tey5d2af632002-05-26 13:36:41 +000070 SocketServer.TCPServer.__init__(self, addr, handlerclass)
71
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000072 def server_bind(self):
73 "Override TCPServer method, no bind() phase for connecting entity"
74 pass
Chui Tey5d2af632002-05-26 13:36:41 +000075
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000076 def server_activate(self):
77 """Override TCPServer method, connect() instead of listen()
78
79 Due to the reversed connection, self.server_address is actually the
80 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000081
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000082 """
83 self.socket.connect(self.server_address)
84
85 def get_request(self):
86 "Override TCPServer method, return already connected socket"
87 return self.socket, self.server_address
Chui Tey5d2af632002-05-26 13:36:41 +000088
89objecttable = {}
90
91class SocketIO:
92
Kurt B. Kaiser6e44cc22002-11-30 06:18:00 +000093 debugging = False
Chui Tey5d2af632002-05-26 13:36:41 +000094
95 def __init__(self, sock, objtable=None, debugging=None):
96 self.mainthread = threading.currentThread()
97 if debugging is not None:
98 self.debugging = debugging
99 self.sock = sock
100 if objtable is None:
101 objtable = objecttable
102 self.objtable = objtable
103 self.statelock = threading.Lock()
104 self.responses = {}
105 self.cvars = {}
106
107 def close(self):
108 sock = self.sock
109 self.sock = None
110 if sock is not None:
111 sock.close()
112
113 def debug(self, *args):
114 if not self.debugging:
115 return
116 s = str(threading.currentThread().getName())
117 for a in args:
118 s = s + " " + str(a)
119 s = s + "\n"
120 sys.__stderr__.write(s)
121
122 def register(self, oid, object):
123 self.objtable[oid] = object
124
125 def unregister(self, oid):
126 try:
127 del self.objtable[oid]
128 except KeyError:
129 pass
130
131 def localcall(self, request):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000132 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000133 try:
134 how, (oid, methodname, args, kwargs) = request
135 except TypeError:
136 return ("ERROR", "Bad request format")
137 assert how == "call"
138 if not self.objtable.has_key(oid):
139 return ("ERROR", "Unknown object id: %s" % `oid`)
140 obj = self.objtable[oid]
141 if methodname == "__methods__":
142 methods = {}
143 _getmethods(obj, methods)
144 return ("OK", methods)
145 if methodname == "__attributes__":
146 attributes = {}
147 _getattributes(obj, attributes)
148 return ("OK", attributes)
149 if not hasattr(obj, methodname):
150 return ("ERROR", "Unsupported method name: %s" % `methodname`)
151 method = getattr(obj, methodname)
152 try:
153 ret = method(*args, **kwargs)
154 if isinstance(ret, RemoteObject):
155 ret = remoteref(ret)
156 return ("OK", ret)
157 except:
158 ##traceback.print_exc(file=sys.__stderr__)
159 typ, val, tb = info = sys.exc_info()
160 sys.last_type, sys.last_value, sys.last_traceback = info
161 if isinstance(typ, type(Exception)):
162 # Class exceptions
163 mod = typ.__module__
164 name = typ.__name__
165 if issubclass(typ, Exception):
166 args = val.args
167 else:
168 args = (str(val),)
169 else:
170 # String exceptions
171 mod = None
172 name = typ
173 args = (str(val),)
174 tb = traceback.extract_tb(tb)
175 return ("EXCEPTION", (mod, name, args, tb))
176
177 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000178 self.debug("remotecall:", oid, methodname, args, kwargs)
Chui Tey5d2af632002-05-26 13:36:41 +0000179 seq = self.asynccall(oid, methodname, args, kwargs)
Kurt B. Kaiser5afa1df2002-10-10 08:25:24 +0000180 ret = self.asyncreturn(seq)
181 self.debug("return:", ret)
182 return ret
Chui Tey5d2af632002-05-26 13:36:41 +0000183
184 def asynccall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser5afa1df2002-10-10 08:25:24 +0000185 self.debug("asyncall:", oid, methodname, args, kwargs)
Chui Tey5d2af632002-05-26 13:36:41 +0000186 request = ("call", (oid, methodname, args, kwargs))
187 seq = self.putrequest(request)
188 return seq
189
190 def asyncreturn(self, seq):
191 response = self.getresponse(seq)
Kurt B. Kaiser6e44cc22002-11-30 06:18:00 +0000192 self.debug("asyncreturn:", response)
Chui Tey5d2af632002-05-26 13:36:41 +0000193 return self.decoderesponse(response)
194
195 def decoderesponse(self, response):
196 how, what = response
197 if how == "OK":
198 return what
199 if how == "EXCEPTION":
200 mod, name, args, tb = what
201 self.traceback = tb
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000202 if mod: # not string exception
Chui Tey5d2af632002-05-26 13:36:41 +0000203 try:
204 __import__(mod)
205 module = sys.modules[mod]
206 except ImportError:
207 pass
208 else:
209 try:
210 cls = getattr(module, name)
211 except AttributeError:
212 pass
213 else:
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000214 # instantiate a built-in exception object and raise it
Chui Tey5d2af632002-05-26 13:36:41 +0000215 raise getattr(__import__(mod), name)(*args)
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000216 name = mod + "." + name
217 # do the best we can:
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000218 raise name, args
Chui Tey5d2af632002-05-26 13:36:41 +0000219 if how == "ERROR":
220 raise RuntimeError, what
221 raise SystemError, (how, what)
222
223 def mainloop(self):
224 try:
225 self.getresponse(None)
226 except EOFError:
227 pass
228
229 def getresponse(self, myseq):
230 response = self._getresponse(myseq)
231 if response is not None:
232 how, what = response
233 if how == "OK":
234 response = how, self._proxify(what)
235 return response
236
237 def _proxify(self, obj):
238 if isinstance(obj, RemoteProxy):
239 return RPCProxy(self, obj.oid)
240 if isinstance(obj, types.ListType):
241 return map(self._proxify, obj)
242 # XXX Check for other types -- not currently needed
243 return obj
244
245 def _getresponse(self, myseq):
246 if threading.currentThread() is self.mainthread:
247 # Main thread: does all reading of requests and responses
248 while 1:
249 response = self.pollresponse(myseq, None)
250 if response is not None:
251 return response
252 else:
253 # Auxiliary thread: wait for notification from main thread
254 cvar = threading.Condition(self.statelock)
255 self.statelock.acquire()
256 self.cvars[myseq] = cvar
257 while not self.responses.has_key(myseq):
258 cvar.wait()
259 response = self.responses[myseq]
260 del self.responses[myseq]
261 del self.cvars[myseq]
262 self.statelock.release()
263 return response
264
265 def putrequest(self, request):
266 seq = self.newseq()
267 self.putmessage((seq, request))
268 return seq
269
270 nextseq = 0
271
272 def newseq(self):
273 self.nextseq = seq = self.nextseq + 2
274 return seq
275
276 def putmessage(self, message):
277 try:
278 s = pickle.dumps(message)
279 except:
280 print >>sys.__stderr__, "Cannot pickle:", `message`
281 raise
282 s = struct.pack("<i", len(s)) + s
283 while len(s) > 0:
284 n = self.sock.send(s)
285 s = s[n:]
286
287 def ioready(self, wait=0.0):
288 r, w, x = select.select([self.sock.fileno()], [], [], wait)
289 return len(r)
290
291 buffer = ""
292 bufneed = 4
293 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
294
295 def pollpacket(self, wait=0.0):
296 self._stage0()
297 if len(self.buffer) < self.bufneed:
298 if not self.ioready(wait):
299 return None
300 try:
301 s = self.sock.recv(BUFSIZE)
302 except socket.error:
303 raise EOFError
304 if len(s) == 0:
305 raise EOFError
306 self.buffer += s
307 self._stage0()
308 return self._stage1()
309
310 def _stage0(self):
311 if self.bufstate == 0 and len(self.buffer) >= 4:
312 s = self.buffer[:4]
313 self.buffer = self.buffer[4:]
314 self.bufneed = struct.unpack("<i", s)[0]
315 self.bufstate = 1
316
317 def _stage1(self):
318 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
319 packet = self.buffer[:self.bufneed]
320 self.buffer = self.buffer[self.bufneed:]
321 self.bufneed = 4
322 self.bufstate = 0
323 return packet
324
325 def pollmessage(self, wait=0.0):
326 packet = self.pollpacket(wait)
327 if packet is None:
328 return None
329 try:
330 message = pickle.loads(packet)
331 except:
332 print >>sys.__stderr__, "-----------------------"
333 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
334 traceback.print_stack(file=sys.__stderr__)
335 print >>sys.__stderr__, "-----------------------"
336 raise
337 return message
338
339 def pollresponse(self, myseq, wait=0.0):
340 # Loop while there's no more buffered input or until specific response
341 while 1:
342 message = self.pollmessage(wait)
343 if message is None:
344 return None
345 wait = 0.0
346 seq, resq = message
347 if resq[0] == "call":
348 response = self.localcall(resq)
349 self.putmessage((seq, response))
350 continue
351 elif seq == myseq:
352 return resq
353 else:
354 self.statelock.acquire()
355 self.responses[seq] = resq
356 cv = self.cvars.get(seq)
357 if cv is not None:
358 cv.notify()
359 self.statelock.release()
360 continue
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000361
362#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000363
364class RemoteObject:
365 # Token mix-in class
366 pass
367
368def remoteref(obj):
369 oid = id(obj)
370 objecttable[oid] = obj
371 return RemoteProxy(oid)
372
373class RemoteProxy:
374
375 def __init__(self, oid):
376 self.oid = oid
377
378class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
379
380 debugging = 0
381
382 def __init__(self, sock, addr, svr):
383 svr.current_handler = self ## cgt xxx
384 SocketIO.__init__(self, sock)
385 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
386
Chui Tey5d2af632002-05-26 13:36:41 +0000387 def handle(self):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000388 "handle() method required by SocketServer"
Chui Tey5d2af632002-05-26 13:36:41 +0000389 self.mainloop()
390
391 def get_remote_proxy(self, oid):
392 return RPCProxy(self, oid)
393
394class RPCClient(SocketIO):
395
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000396 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000397
398 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000399 self.listening_sock = socket.socket(family, type)
400 self.listening_sock.setsockopt(socket.SOL_SOCKET,
401 socket.SO_REUSEADDR, 1)
402 self.listening_sock.bind(address)
403 self.listening_sock.listen(1)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000404
405 def accept(self):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000406 working_sock, address = self.listening_sock.accept()
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000407 if address[0] == '127.0.0.1':
408 print>>sys.__stderr__, "Idle accepted connection from ", address
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000409 SocketIO.__init__(self, working_sock)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000410 else:
411 print>>sys.__stderr__, "Invalid host: ", address
412 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000413
414 def get_remote_proxy(self, oid):
415 return RPCProxy(self, oid)
416
417class RPCProxy:
418
419 __methods = None
420 __attributes = None
421
422 def __init__(self, sockio, oid):
423 self.sockio = sockio
424 self.oid = oid
425
426 def __getattr__(self, name):
427 if self.__methods is None:
428 self.__getmethods()
429 if self.__methods.get(name):
430 return MethodProxy(self.sockio, self.oid, name)
431 if self.__attributes is None:
432 self.__getattributes()
433 if not self.__attributes.has_key(name):
434 raise AttributeError, name
435 __getattr__.DebuggerStepThrough=1
436
437 def __getattributes(self):
438 self.__attributes = self.sockio.remotecall(self.oid,
439 "__attributes__", (), {})
440
441 def __getmethods(self):
442 self.__methods = self.sockio.remotecall(self.oid,
443 "__methods__", (), {})
444
445def _getmethods(obj, methods):
446 # Helper to get a list of methods from an object
447 # Adds names to dictionary argument 'methods'
448 for name in dir(obj):
449 attr = getattr(obj, name)
450 if callable(attr):
451 methods[name] = 1
452 if type(obj) == types.InstanceType:
453 _getmethods(obj.__class__, methods)
454 if type(obj) == types.ClassType:
455 for super in obj.__bases__:
456 _getmethods(super, methods)
457
458def _getattributes(obj, attributes):
459 for name in dir(obj):
460 attr = getattr(obj, name)
461 if not callable(attr):
462 attributes[name] = 1
463
464class MethodProxy:
465
466 def __init__(self, sockio, oid, name):
467 self.sockio = sockio
468 self.oid = oid
469 self.name = name
470
471 def __call__(self, *args, **kwargs):
472 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
473 return value
474
475#
476# Self Test
477#
478
479def testServer(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000480 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000481 class RemotePerson:
482 def __init__(self,name):
483 self.name = name
484 def greet(self, name):
485 print "(someone called greet)"
486 print "Hello %s, I am %s." % (name, self.name)
487 print
488 def getName(self):
489 print "(someone called getName)"
490 print
491 return self.name
492 def greet_this_guy(self, name):
493 print "(someone called greet_this_guy)"
494 print "About to greet %s ..." % name
495 remote_guy = self.server.current_handler.get_remote_proxy(name)
496 remote_guy.greet("Thomas Edison")
497 print "Done."
498 print
499
500 person = RemotePerson("Thomas Edison")
501 svr = RPCServer(addr)
502 svr.register('thomas', person)
503 person.server = svr # only required if callbacks are used
504
505 # svr.serve_forever()
506 svr.handle_request() # process once only
507
508def testClient(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000509 "demonstrates RPC Client"
510 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000511 import time
512 clt=RPCClient(addr)
513 thomas = clt.get_remote_proxy("thomas")
514 print "The remote person's name is ..."
515 print thomas.getName()
516 # print clt.remotecall("thomas", "getName", (), {})
517 print
518 time.sleep(1)
519 print "Getting remote thomas to say hi..."
520 thomas.greet("Alexander Bell")
521 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
522 print "Done."
523 print
524 time.sleep(2)
Chui Tey5d2af632002-05-26 13:36:41 +0000525 # demonstrates remote server calling local instance
526 class LocalPerson:
527 def __init__(self,name):
528 self.name = name
529 def greet(self, name):
530 print "You've greeted me!"
531 def getName(self):
532 return self.name
533 person = LocalPerson("Alexander Bell")
534 clt.register("alexander",person)
535 thomas.greet_this_guy("alexander")
536 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
537
538def test():
539 addr=("localhost",8833)
540 if len(sys.argv) == 2:
541 if sys.argv[1]=='-server':
542 testServer(addr)
543 return
544 testClient(addr)
545
546if __name__ == '__main__':
547 test()
548
549