blob: cd11dfa97d65f934b0287bdb96b46006a588bc9f [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Chui Tey5d2af632002-05-26 13:36:41 +000031import socket
32import select
33import SocketServer
34import struct
35import cPickle as pickle
36import threading
37import traceback
38import copy_reg
39import types
40import marshal
41
42def unpickle_code(ms):
43 co = marshal.loads(ms)
44 assert isinstance(co, types.CodeType)
45 return co
46
47def pickle_code(co):
48 assert isinstance(co, types.CodeType)
49 ms = marshal.dumps(co)
50 return unpickle_code, (ms,)
51
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000052# XXX KBK 24Aug02 function pickling capability not used in Idle
53# def unpickle_function(ms):
54# return ms
Chui Tey5d2af632002-05-26 13:36:41 +000055
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000056# def pickle_function(fn):
57# assert isinstance(fn, type.FunctionType)
58# return `fn`
Chui Tey5d2af632002-05-26 13:36:41 +000059
60copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000061# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
Chui Tey5d2af632002-05-26 13:36:41 +000062
63BUFSIZE = 8*1024
64
65class RPCServer(SocketServer.TCPServer):
66
67 def __init__(self, addr, handlerclass=None):
68 if handlerclass is None:
69 handlerclass = RPCHandler
Chui Tey5d2af632002-05-26 13:36:41 +000070 SocketServer.TCPServer.__init__(self, addr, handlerclass)
71
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000072 def server_bind(self):
73 "Override TCPServer method, no bind() phase for connecting entity"
74 pass
Chui Tey5d2af632002-05-26 13:36:41 +000075
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000076 def server_activate(self):
77 """Override TCPServer method, connect() instead of listen()
78
79 Due to the reversed connection, self.server_address is actually the
80 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000081
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000082 """
83 self.socket.connect(self.server_address)
84
85 def get_request(self):
86 "Override TCPServer method, return already connected socket"
87 return self.socket, self.server_address
Chui Tey5d2af632002-05-26 13:36:41 +000088
89objecttable = {}
90
91class SocketIO:
92
93 debugging = 0
94
95 def __init__(self, sock, objtable=None, debugging=None):
96 self.mainthread = threading.currentThread()
97 if debugging is not None:
98 self.debugging = debugging
99 self.sock = sock
100 if objtable is None:
101 objtable = objecttable
102 self.objtable = objtable
103 self.statelock = threading.Lock()
104 self.responses = {}
105 self.cvars = {}
106
107 def close(self):
108 sock = self.sock
109 self.sock = None
110 if sock is not None:
111 sock.close()
112
113 def debug(self, *args):
114 if not self.debugging:
115 return
116 s = str(threading.currentThread().getName())
117 for a in args:
118 s = s + " " + str(a)
119 s = s + "\n"
120 sys.__stderr__.write(s)
121
122 def register(self, oid, object):
123 self.objtable[oid] = object
124
125 def unregister(self, oid):
126 try:
127 del self.objtable[oid]
128 except KeyError:
129 pass
130
131 def localcall(self, request):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000132 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000133 try:
134 how, (oid, methodname, args, kwargs) = request
135 except TypeError:
136 return ("ERROR", "Bad request format")
137 assert how == "call"
138 if not self.objtable.has_key(oid):
139 return ("ERROR", "Unknown object id: %s" % `oid`)
140 obj = self.objtable[oid]
141 if methodname == "__methods__":
142 methods = {}
143 _getmethods(obj, methods)
144 return ("OK", methods)
145 if methodname == "__attributes__":
146 attributes = {}
147 _getattributes(obj, attributes)
148 return ("OK", attributes)
149 if not hasattr(obj, methodname):
150 return ("ERROR", "Unsupported method name: %s" % `methodname`)
151 method = getattr(obj, methodname)
152 try:
153 ret = method(*args, **kwargs)
154 if isinstance(ret, RemoteObject):
155 ret = remoteref(ret)
156 return ("OK", ret)
157 except:
158 ##traceback.print_exc(file=sys.__stderr__)
159 typ, val, tb = info = sys.exc_info()
160 sys.last_type, sys.last_value, sys.last_traceback = info
161 if isinstance(typ, type(Exception)):
162 # Class exceptions
163 mod = typ.__module__
164 name = typ.__name__
165 if issubclass(typ, Exception):
166 args = val.args
167 else:
168 args = (str(val),)
169 else:
170 # String exceptions
171 mod = None
172 name = typ
173 args = (str(val),)
174 tb = traceback.extract_tb(tb)
175 return ("EXCEPTION", (mod, name, args, tb))
176
177 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000178 self.debug("remotecall:", oid, methodname, args, kwargs)
Chui Tey5d2af632002-05-26 13:36:41 +0000179 seq = self.asynccall(oid, methodname, args, kwargs)
180 return self.asyncreturn(seq)
181
182 def asynccall(self, oid, methodname, args, kwargs):
183 request = ("call", (oid, methodname, args, kwargs))
184 seq = self.putrequest(request)
185 return seq
186
187 def asyncreturn(self, seq):
188 response = self.getresponse(seq)
189 return self.decoderesponse(response)
190
191 def decoderesponse(self, response):
192 how, what = response
193 if how == "OK":
194 return what
195 if how == "EXCEPTION":
196 mod, name, args, tb = what
197 self.traceback = tb
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000198 if mod: # not string exception
Chui Tey5d2af632002-05-26 13:36:41 +0000199 try:
200 __import__(mod)
201 module = sys.modules[mod]
202 except ImportError:
203 pass
204 else:
205 try:
206 cls = getattr(module, name)
207 except AttributeError:
208 pass
209 else:
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000210 # instantiate a built-in exception object and raise it
Chui Tey5d2af632002-05-26 13:36:41 +0000211 raise getattr(__import__(mod), name)(*args)
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000212 name = mod + "." + name
213 # do the best we can:
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000214 raise name, args
Chui Tey5d2af632002-05-26 13:36:41 +0000215 if how == "ERROR":
216 raise RuntimeError, what
217 raise SystemError, (how, what)
218
219 def mainloop(self):
220 try:
221 self.getresponse(None)
222 except EOFError:
223 pass
224
225 def getresponse(self, myseq):
226 response = self._getresponse(myseq)
227 if response is not None:
228 how, what = response
229 if how == "OK":
230 response = how, self._proxify(what)
231 return response
232
233 def _proxify(self, obj):
234 if isinstance(obj, RemoteProxy):
235 return RPCProxy(self, obj.oid)
236 if isinstance(obj, types.ListType):
237 return map(self._proxify, obj)
238 # XXX Check for other types -- not currently needed
239 return obj
240
241 def _getresponse(self, myseq):
242 if threading.currentThread() is self.mainthread:
243 # Main thread: does all reading of requests and responses
244 while 1:
245 response = self.pollresponse(myseq, None)
246 if response is not None:
247 return response
248 else:
249 # Auxiliary thread: wait for notification from main thread
250 cvar = threading.Condition(self.statelock)
251 self.statelock.acquire()
252 self.cvars[myseq] = cvar
253 while not self.responses.has_key(myseq):
254 cvar.wait()
255 response = self.responses[myseq]
256 del self.responses[myseq]
257 del self.cvars[myseq]
258 self.statelock.release()
259 return response
260
261 def putrequest(self, request):
262 seq = self.newseq()
263 self.putmessage((seq, request))
264 return seq
265
266 nextseq = 0
267
268 def newseq(self):
269 self.nextseq = seq = self.nextseq + 2
270 return seq
271
272 def putmessage(self, message):
273 try:
274 s = pickle.dumps(message)
275 except:
276 print >>sys.__stderr__, "Cannot pickle:", `message`
277 raise
278 s = struct.pack("<i", len(s)) + s
279 while len(s) > 0:
280 n = self.sock.send(s)
281 s = s[n:]
282
283 def ioready(self, wait=0.0):
284 r, w, x = select.select([self.sock.fileno()], [], [], wait)
285 return len(r)
286
287 buffer = ""
288 bufneed = 4
289 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
290
291 def pollpacket(self, wait=0.0):
292 self._stage0()
293 if len(self.buffer) < self.bufneed:
294 if not self.ioready(wait):
295 return None
296 try:
297 s = self.sock.recv(BUFSIZE)
298 except socket.error:
299 raise EOFError
300 if len(s) == 0:
301 raise EOFError
302 self.buffer += s
303 self._stage0()
304 return self._stage1()
305
306 def _stage0(self):
307 if self.bufstate == 0 and len(self.buffer) >= 4:
308 s = self.buffer[:4]
309 self.buffer = self.buffer[4:]
310 self.bufneed = struct.unpack("<i", s)[0]
311 self.bufstate = 1
312
313 def _stage1(self):
314 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
315 packet = self.buffer[:self.bufneed]
316 self.buffer = self.buffer[self.bufneed:]
317 self.bufneed = 4
318 self.bufstate = 0
319 return packet
320
321 def pollmessage(self, wait=0.0):
322 packet = self.pollpacket(wait)
323 if packet is None:
324 return None
325 try:
326 message = pickle.loads(packet)
327 except:
328 print >>sys.__stderr__, "-----------------------"
329 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
330 traceback.print_stack(file=sys.__stderr__)
331 print >>sys.__stderr__, "-----------------------"
332 raise
333 return message
334
335 def pollresponse(self, myseq, wait=0.0):
336 # Loop while there's no more buffered input or until specific response
337 while 1:
338 message = self.pollmessage(wait)
339 if message is None:
340 return None
341 wait = 0.0
342 seq, resq = message
343 if resq[0] == "call":
344 response = self.localcall(resq)
345 self.putmessage((seq, response))
346 continue
347 elif seq == myseq:
348 return resq
349 else:
350 self.statelock.acquire()
351 self.responses[seq] = resq
352 cv = self.cvars.get(seq)
353 if cv is not None:
354 cv.notify()
355 self.statelock.release()
356 continue
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000357
358#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000359
360class RemoteObject:
361 # Token mix-in class
362 pass
363
364def remoteref(obj):
365 oid = id(obj)
366 objecttable[oid] = obj
367 return RemoteProxy(oid)
368
369class RemoteProxy:
370
371 def __init__(self, oid):
372 self.oid = oid
373
374class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
375
376 debugging = 0
377
378 def __init__(self, sock, addr, svr):
379 svr.current_handler = self ## cgt xxx
380 SocketIO.__init__(self, sock)
381 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
382
Chui Tey5d2af632002-05-26 13:36:41 +0000383 def handle(self):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000384 "handle() method required by SocketServer"
Chui Tey5d2af632002-05-26 13:36:41 +0000385 self.mainloop()
386
387 def get_remote_proxy(self, oid):
388 return RPCProxy(self, oid)
389
390class RPCClient(SocketIO):
391
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000392 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000393
394 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000395 self.listening_sock = socket.socket(family, type)
396 self.listening_sock.setsockopt(socket.SOL_SOCKET,
397 socket.SO_REUSEADDR, 1)
398 self.listening_sock.bind(address)
399 self.listening_sock.listen(1)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000400
401 def accept(self):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000402 working_sock, address = self.listening_sock.accept()
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000403 if address[0] == '127.0.0.1':
404 print>>sys.__stderr__, "Idle accepted connection from ", address
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000405 SocketIO.__init__(self, working_sock)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000406 else:
407 print>>sys.__stderr__, "Invalid host: ", address
408 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000409
410 def get_remote_proxy(self, oid):
411 return RPCProxy(self, oid)
412
413class RPCProxy:
414
415 __methods = None
416 __attributes = None
417
418 def __init__(self, sockio, oid):
419 self.sockio = sockio
420 self.oid = oid
421
422 def __getattr__(self, name):
423 if self.__methods is None:
424 self.__getmethods()
425 if self.__methods.get(name):
426 return MethodProxy(self.sockio, self.oid, name)
427 if self.__attributes is None:
428 self.__getattributes()
429 if not self.__attributes.has_key(name):
430 raise AttributeError, name
431 __getattr__.DebuggerStepThrough=1
432
433 def __getattributes(self):
434 self.__attributes = self.sockio.remotecall(self.oid,
435 "__attributes__", (), {})
436
437 def __getmethods(self):
438 self.__methods = self.sockio.remotecall(self.oid,
439 "__methods__", (), {})
440
441def _getmethods(obj, methods):
442 # Helper to get a list of methods from an object
443 # Adds names to dictionary argument 'methods'
444 for name in dir(obj):
445 attr = getattr(obj, name)
446 if callable(attr):
447 methods[name] = 1
448 if type(obj) == types.InstanceType:
449 _getmethods(obj.__class__, methods)
450 if type(obj) == types.ClassType:
451 for super in obj.__bases__:
452 _getmethods(super, methods)
453
454def _getattributes(obj, attributes):
455 for name in dir(obj):
456 attr = getattr(obj, name)
457 if not callable(attr):
458 attributes[name] = 1
459
460class MethodProxy:
461
462 def __init__(self, sockio, oid, name):
463 self.sockio = sockio
464 self.oid = oid
465 self.name = name
466
467 def __call__(self, *args, **kwargs):
468 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
469 return value
470
471#
472# Self Test
473#
474
475def testServer(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000476 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000477 class RemotePerson:
478 def __init__(self,name):
479 self.name = name
480 def greet(self, name):
481 print "(someone called greet)"
482 print "Hello %s, I am %s." % (name, self.name)
483 print
484 def getName(self):
485 print "(someone called getName)"
486 print
487 return self.name
488 def greet_this_guy(self, name):
489 print "(someone called greet_this_guy)"
490 print "About to greet %s ..." % name
491 remote_guy = self.server.current_handler.get_remote_proxy(name)
492 remote_guy.greet("Thomas Edison")
493 print "Done."
494 print
495
496 person = RemotePerson("Thomas Edison")
497 svr = RPCServer(addr)
498 svr.register('thomas', person)
499 person.server = svr # only required if callbacks are used
500
501 # svr.serve_forever()
502 svr.handle_request() # process once only
503
504def testClient(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000505 "demonstrates RPC Client"
506 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000507 import time
508 clt=RPCClient(addr)
509 thomas = clt.get_remote_proxy("thomas")
510 print "The remote person's name is ..."
511 print thomas.getName()
512 # print clt.remotecall("thomas", "getName", (), {})
513 print
514 time.sleep(1)
515 print "Getting remote thomas to say hi..."
516 thomas.greet("Alexander Bell")
517 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
518 print "Done."
519 print
520 time.sleep(2)
Chui Tey5d2af632002-05-26 13:36:41 +0000521 # demonstrates remote server calling local instance
522 class LocalPerson:
523 def __init__(self,name):
524 self.name = name
525 def greet(self, name):
526 print "You've greeted me!"
527 def getName(self):
528 return self.name
529 person = LocalPerson("Alexander Bell")
530 clt.register("alexander",person)
531 thomas.greet_this_guy("alexander")
532 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
533
534def test():
535 addr=("localhost",8833)
536 if len(sys.argv) == 2:
537 if sys.argv[1]=='-server':
538 testServer(addr)
539 return
540 testClient(addr)
541
542if __name__ == '__main__':
543 test()
544
545