blob: 922a460c0c5d45639549d35aecc9af5717632fb2 [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Chui Tey5d2af632002-05-26 13:36:41 +000031import socket
32import select
33import SocketServer
34import struct
35import cPickle as pickle
36import threading
37import traceback
38import copy_reg
39import types
40import marshal
41
42def unpickle_code(ms):
43 co = marshal.loads(ms)
44 assert isinstance(co, types.CodeType)
45 return co
46
47def pickle_code(co):
48 assert isinstance(co, types.CodeType)
49 ms = marshal.dumps(co)
50 return unpickle_code, (ms,)
51
52def unpickle_function(ms):
53 return ms
54
55def pickle_function(fn):
56 assert isinstance(fn, type.FunctionType)
57 return `fn`
58
59copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
60copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
61
62BUFSIZE = 8*1024
63
64class RPCServer(SocketServer.TCPServer):
65
66 def __init__(self, addr, handlerclass=None):
67 if handlerclass is None:
68 handlerclass = RPCHandler
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000069# XXX KBK 25Jun02 Not used in Idlefork.
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000070# self.objtable = objecttable
Chui Tey5d2af632002-05-26 13:36:41 +000071 SocketServer.TCPServer.__init__(self, addr, handlerclass)
72
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000073 def server_bind(self):
74 "Override TCPServer method, no bind() phase for connecting entity"
75 pass
Chui Tey5d2af632002-05-26 13:36:41 +000076
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000077 def server_activate(self):
78 """Override TCPServer method, connect() instead of listen()
79
80 Due to the reversed connection, self.server_address is actually the
81 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000082
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000083 """
84 self.socket.connect(self.server_address)
85
86 def get_request(self):
87 "Override TCPServer method, return already connected socket"
88 return self.socket, self.server_address
89
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000090
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000091# XXX The following two methods are not currently used in Idlefork.
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000092# def register(self, oid, object):
93# self.objtable[oid] = object
94
95# def unregister(self, oid):
96# try:
97# del self.objtable[oid]
98# except KeyError:
99# pass
Chui Tey5d2af632002-05-26 13:36:41 +0000100
101
102objecttable = {}
103
104class SocketIO:
105
106 debugging = 0
107
108 def __init__(self, sock, objtable=None, debugging=None):
109 self.mainthread = threading.currentThread()
110 if debugging is not None:
111 self.debugging = debugging
112 self.sock = sock
113 if objtable is None:
114 objtable = objecttable
115 self.objtable = objtable
116 self.statelock = threading.Lock()
117 self.responses = {}
118 self.cvars = {}
119
120 def close(self):
121 sock = self.sock
122 self.sock = None
123 if sock is not None:
124 sock.close()
125
126 def debug(self, *args):
127 if not self.debugging:
128 return
129 s = str(threading.currentThread().getName())
130 for a in args:
131 s = s + " " + str(a)
132 s = s + "\n"
133 sys.__stderr__.write(s)
134
135 def register(self, oid, object):
136 self.objtable[oid] = object
137
138 def unregister(self, oid):
139 try:
140 del self.objtable[oid]
141 except KeyError:
142 pass
143
144 def localcall(self, request):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000145 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000146 try:
147 how, (oid, methodname, args, kwargs) = request
148 except TypeError:
149 return ("ERROR", "Bad request format")
150 assert how == "call"
151 if not self.objtable.has_key(oid):
152 return ("ERROR", "Unknown object id: %s" % `oid`)
153 obj = self.objtable[oid]
154 if methodname == "__methods__":
155 methods = {}
156 _getmethods(obj, methods)
157 return ("OK", methods)
158 if methodname == "__attributes__":
159 attributes = {}
160 _getattributes(obj, attributes)
161 return ("OK", attributes)
162 if not hasattr(obj, methodname):
163 return ("ERROR", "Unsupported method name: %s" % `methodname`)
164 method = getattr(obj, methodname)
165 try:
166 ret = method(*args, **kwargs)
167 if isinstance(ret, RemoteObject):
168 ret = remoteref(ret)
169 return ("OK", ret)
170 except:
171 ##traceback.print_exc(file=sys.__stderr__)
172 typ, val, tb = info = sys.exc_info()
173 sys.last_type, sys.last_value, sys.last_traceback = info
174 if isinstance(typ, type(Exception)):
175 # Class exceptions
176 mod = typ.__module__
177 name = typ.__name__
178 if issubclass(typ, Exception):
179 args = val.args
180 else:
181 args = (str(val),)
182 else:
183 # String exceptions
184 mod = None
185 name = typ
186 args = (str(val),)
187 tb = traceback.extract_tb(tb)
188 return ("EXCEPTION", (mod, name, args, tb))
189
190 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000191 self.debug("remotecall:", oid, methodname, args, kwargs)
Chui Tey5d2af632002-05-26 13:36:41 +0000192 seq = self.asynccall(oid, methodname, args, kwargs)
193 return self.asyncreturn(seq)
194
195 def asynccall(self, oid, methodname, args, kwargs):
196 request = ("call", (oid, methodname, args, kwargs))
197 seq = self.putrequest(request)
198 return seq
199
200 def asyncreturn(self, seq):
201 response = self.getresponse(seq)
202 return self.decoderesponse(response)
203
204 def decoderesponse(self, response):
205 how, what = response
206 if how == "OK":
207 return what
208 if how == "EXCEPTION":
209 mod, name, args, tb = what
210 self.traceback = tb
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000211 if mod: # not string exception
Chui Tey5d2af632002-05-26 13:36:41 +0000212 try:
213 __import__(mod)
214 module = sys.modules[mod]
215 except ImportError:
216 pass
217 else:
218 try:
219 cls = getattr(module, name)
220 except AttributeError:
221 pass
222 else:
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000223 # instantiate a built-in exception object and raise it
Chui Tey5d2af632002-05-26 13:36:41 +0000224 raise getattr(__import__(mod), name)(*args)
Kurt B. Kaisera552e3a2002-08-24 23:57:17 +0000225 name = mod + "." + name
226 # do the best we can:
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000227 raise name, args
Chui Tey5d2af632002-05-26 13:36:41 +0000228 if how == "ERROR":
229 raise RuntimeError, what
230 raise SystemError, (how, what)
231
232 def mainloop(self):
233 try:
234 self.getresponse(None)
235 except EOFError:
236 pass
237
238 def getresponse(self, myseq):
239 response = self._getresponse(myseq)
240 if response is not None:
241 how, what = response
242 if how == "OK":
243 response = how, self._proxify(what)
244 return response
245
246 def _proxify(self, obj):
247 if isinstance(obj, RemoteProxy):
248 return RPCProxy(self, obj.oid)
249 if isinstance(obj, types.ListType):
250 return map(self._proxify, obj)
251 # XXX Check for other types -- not currently needed
252 return obj
253
254 def _getresponse(self, myseq):
255 if threading.currentThread() is self.mainthread:
256 # Main thread: does all reading of requests and responses
257 while 1:
258 response = self.pollresponse(myseq, None)
259 if response is not None:
260 return response
261 else:
262 # Auxiliary thread: wait for notification from main thread
263 cvar = threading.Condition(self.statelock)
264 self.statelock.acquire()
265 self.cvars[myseq] = cvar
266 while not self.responses.has_key(myseq):
267 cvar.wait()
268 response = self.responses[myseq]
269 del self.responses[myseq]
270 del self.cvars[myseq]
271 self.statelock.release()
272 return response
273
274 def putrequest(self, request):
275 seq = self.newseq()
276 self.putmessage((seq, request))
277 return seq
278
279 nextseq = 0
280
281 def newseq(self):
282 self.nextseq = seq = self.nextseq + 2
283 return seq
284
285 def putmessage(self, message):
286 try:
287 s = pickle.dumps(message)
288 except:
289 print >>sys.__stderr__, "Cannot pickle:", `message`
290 raise
291 s = struct.pack("<i", len(s)) + s
292 while len(s) > 0:
293 n = self.sock.send(s)
294 s = s[n:]
295
296 def ioready(self, wait=0.0):
297 r, w, x = select.select([self.sock.fileno()], [], [], wait)
298 return len(r)
299
300 buffer = ""
301 bufneed = 4
302 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
303
304 def pollpacket(self, wait=0.0):
305 self._stage0()
306 if len(self.buffer) < self.bufneed:
307 if not self.ioready(wait):
308 return None
309 try:
310 s = self.sock.recv(BUFSIZE)
311 except socket.error:
312 raise EOFError
313 if len(s) == 0:
314 raise EOFError
315 self.buffer += s
316 self._stage0()
317 return self._stage1()
318
319 def _stage0(self):
320 if self.bufstate == 0 and len(self.buffer) >= 4:
321 s = self.buffer[:4]
322 self.buffer = self.buffer[4:]
323 self.bufneed = struct.unpack("<i", s)[0]
324 self.bufstate = 1
325
326 def _stage1(self):
327 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
328 packet = self.buffer[:self.bufneed]
329 self.buffer = self.buffer[self.bufneed:]
330 self.bufneed = 4
331 self.bufstate = 0
332 return packet
333
334 def pollmessage(self, wait=0.0):
335 packet = self.pollpacket(wait)
336 if packet is None:
337 return None
338 try:
339 message = pickle.loads(packet)
340 except:
341 print >>sys.__stderr__, "-----------------------"
342 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
343 traceback.print_stack(file=sys.__stderr__)
344 print >>sys.__stderr__, "-----------------------"
345 raise
346 return message
347
348 def pollresponse(self, myseq, wait=0.0):
349 # Loop while there's no more buffered input or until specific response
350 while 1:
351 message = self.pollmessage(wait)
352 if message is None:
353 return None
354 wait = 0.0
355 seq, resq = message
356 if resq[0] == "call":
357 response = self.localcall(resq)
358 self.putmessage((seq, response))
359 continue
360 elif seq == myseq:
361 return resq
362 else:
363 self.statelock.acquire()
364 self.responses[seq] = resq
365 cv = self.cvars.get(seq)
366 if cv is not None:
367 cv.notify()
368 self.statelock.release()
369 continue
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000370
371#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000372
373class RemoteObject:
374 # Token mix-in class
375 pass
376
377def remoteref(obj):
378 oid = id(obj)
379 objecttable[oid] = obj
380 return RemoteProxy(oid)
381
382class RemoteProxy:
383
384 def __init__(self, oid):
385 self.oid = oid
386
387class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
388
389 debugging = 0
390
391 def __init__(self, sock, addr, svr):
392 svr.current_handler = self ## cgt xxx
393 SocketIO.__init__(self, sock)
394 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
395
Chui Tey5d2af632002-05-26 13:36:41 +0000396 def handle(self):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000397 "handle() method required by SocketServer"
Chui Tey5d2af632002-05-26 13:36:41 +0000398 self.mainloop()
399
400 def get_remote_proxy(self, oid):
401 return RPCProxy(self, oid)
402
403class RPCClient(SocketIO):
404
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000405 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000406
407 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000408 self.sock = socket.socket(family, type)
Kurt B. Kaiser8dcdb772002-08-05 03:52:10 +0000409 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000410 self.sock.bind(address)
411 self.sock.listen(1)
412
413 def accept(self):
414 newsock, address = self.sock.accept()
415 if address[0] == '127.0.0.1':
416 print>>sys.__stderr__, "Idle accepted connection from ", address
417 SocketIO.__init__(self, newsock)
418 else:
419 print>>sys.__stderr__, "Invalid host: ", address
420 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000421
422 def get_remote_proxy(self, oid):
423 return RPCProxy(self, oid)
424
425class RPCProxy:
426
427 __methods = None
428 __attributes = None
429
430 def __init__(self, sockio, oid):
431 self.sockio = sockio
432 self.oid = oid
433
434 def __getattr__(self, name):
435 if self.__methods is None:
436 self.__getmethods()
437 if self.__methods.get(name):
438 return MethodProxy(self.sockio, self.oid, name)
439 if self.__attributes is None:
440 self.__getattributes()
441 if not self.__attributes.has_key(name):
442 raise AttributeError, name
443 __getattr__.DebuggerStepThrough=1
444
445 def __getattributes(self):
446 self.__attributes = self.sockio.remotecall(self.oid,
447 "__attributes__", (), {})
448
449 def __getmethods(self):
450 self.__methods = self.sockio.remotecall(self.oid,
451 "__methods__", (), {})
452
453def _getmethods(obj, methods):
454 # Helper to get a list of methods from an object
455 # Adds names to dictionary argument 'methods'
456 for name in dir(obj):
457 attr = getattr(obj, name)
458 if callable(attr):
459 methods[name] = 1
460 if type(obj) == types.InstanceType:
461 _getmethods(obj.__class__, methods)
462 if type(obj) == types.ClassType:
463 for super in obj.__bases__:
464 _getmethods(super, methods)
465
466def _getattributes(obj, attributes):
467 for name in dir(obj):
468 attr = getattr(obj, name)
469 if not callable(attr):
470 attributes[name] = 1
471
472class MethodProxy:
473
474 def __init__(self, sockio, oid, name):
475 self.sockio = sockio
476 self.oid = oid
477 self.name = name
478
479 def __call__(self, *args, **kwargs):
480 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
481 return value
482
483#
484# Self Test
485#
486
487def testServer(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000488 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000489 class RemotePerson:
490 def __init__(self,name):
491 self.name = name
492 def greet(self, name):
493 print "(someone called greet)"
494 print "Hello %s, I am %s." % (name, self.name)
495 print
496 def getName(self):
497 print "(someone called getName)"
498 print
499 return self.name
500 def greet_this_guy(self, name):
501 print "(someone called greet_this_guy)"
502 print "About to greet %s ..." % name
503 remote_guy = self.server.current_handler.get_remote_proxy(name)
504 remote_guy.greet("Thomas Edison")
505 print "Done."
506 print
507
508 person = RemotePerson("Thomas Edison")
509 svr = RPCServer(addr)
510 svr.register('thomas', person)
511 person.server = svr # only required if callbacks are used
512
513 # svr.serve_forever()
514 svr.handle_request() # process once only
515
516def testClient(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000517 "demonstrates RPC Client"
518 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000519 import time
520 clt=RPCClient(addr)
521 thomas = clt.get_remote_proxy("thomas")
522 print "The remote person's name is ..."
523 print thomas.getName()
524 # print clt.remotecall("thomas", "getName", (), {})
525 print
526 time.sleep(1)
527 print "Getting remote thomas to say hi..."
528 thomas.greet("Alexander Bell")
529 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
530 print "Done."
531 print
532 time.sleep(2)
Chui Tey5d2af632002-05-26 13:36:41 +0000533 # demonstrates remote server calling local instance
534 class LocalPerson:
535 def __init__(self,name):
536 self.name = name
537 def greet(self, name):
538 print "You've greeted me!"
539 def getName(self):
540 return self.name
541 person = LocalPerson("Alexander Bell")
542 clt.register("alexander",person)
543 thomas.greet_this_guy("alexander")
544 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
545
546def test():
547 addr=("localhost",8833)
548 if len(sys.argv) == 2:
549 if sys.argv[1]=='-server':
550 testServer(addr)
551 return
552 testClient(addr)
553
554if __name__ == '__main__':
555 test()
556
557