blob: 267dd60564bd3b8cc939996ba96f4d7fad669bc5 [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Chui Tey5d2af632002-05-26 13:36:41 +000031import socket
32import select
33import SocketServer
34import struct
35import cPickle as pickle
36import threading
37import traceback
38import copy_reg
39import types
40import marshal
41
42def unpickle_code(ms):
43 co = marshal.loads(ms)
44 assert isinstance(co, types.CodeType)
45 return co
46
47def pickle_code(co):
48 assert isinstance(co, types.CodeType)
49 ms = marshal.dumps(co)
50 return unpickle_code, (ms,)
51
52def unpickle_function(ms):
53 return ms
54
55def pickle_function(fn):
56 assert isinstance(fn, type.FunctionType)
57 return `fn`
58
59copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
60copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
61
62BUFSIZE = 8*1024
63
64class RPCServer(SocketServer.TCPServer):
65
66 def __init__(self, addr, handlerclass=None):
67 if handlerclass is None:
68 handlerclass = RPCHandler
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000069# XXX KBK 25Jun02 Not used in Idlefork.
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000070# self.objtable = objecttable
Chui Tey5d2af632002-05-26 13:36:41 +000071 SocketServer.TCPServer.__init__(self, addr, handlerclass)
72
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000073 def server_bind(self):
74 "Override TCPServer method, no bind() phase for connecting entity"
75 pass
Chui Tey5d2af632002-05-26 13:36:41 +000076
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000077 def server_activate(self):
78 """Override TCPServer method, connect() instead of listen()
79
80 Due to the reversed connection, self.server_address is actually the
81 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000082
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000083 """
84 self.socket.connect(self.server_address)
85
86 def get_request(self):
87 "Override TCPServer method, return already connected socket"
88 return self.socket, self.server_address
89
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000090
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000091# XXX The following two methods are not currently used in Idlefork.
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000092# def register(self, oid, object):
93# self.objtable[oid] = object
94
95# def unregister(self, oid):
96# try:
97# del self.objtable[oid]
98# except KeyError:
99# pass
Chui Tey5d2af632002-05-26 13:36:41 +0000100
101
102objecttable = {}
103
104class SocketIO:
105
106 debugging = 0
107
108 def __init__(self, sock, objtable=None, debugging=None):
109 self.mainthread = threading.currentThread()
110 if debugging is not None:
111 self.debugging = debugging
112 self.sock = sock
113 if objtable is None:
114 objtable = objecttable
115 self.objtable = objtable
116 self.statelock = threading.Lock()
117 self.responses = {}
118 self.cvars = {}
119
120 def close(self):
121 sock = self.sock
122 self.sock = None
123 if sock is not None:
124 sock.close()
125
126 def debug(self, *args):
127 if not self.debugging:
128 return
129 s = str(threading.currentThread().getName())
130 for a in args:
131 s = s + " " + str(a)
132 s = s + "\n"
133 sys.__stderr__.write(s)
134
135 def register(self, oid, object):
136 self.objtable[oid] = object
137
138 def unregister(self, oid):
139 try:
140 del self.objtable[oid]
141 except KeyError:
142 pass
143
144 def localcall(self, request):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000145 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000146 try:
147 how, (oid, methodname, args, kwargs) = request
148 except TypeError:
149 return ("ERROR", "Bad request format")
150 assert how == "call"
151 if not self.objtable.has_key(oid):
152 return ("ERROR", "Unknown object id: %s" % `oid`)
153 obj = self.objtable[oid]
154 if methodname == "__methods__":
155 methods = {}
156 _getmethods(obj, methods)
157 return ("OK", methods)
158 if methodname == "__attributes__":
159 attributes = {}
160 _getattributes(obj, attributes)
161 return ("OK", attributes)
162 if not hasattr(obj, methodname):
163 return ("ERROR", "Unsupported method name: %s" % `methodname`)
164 method = getattr(obj, methodname)
165 try:
166 ret = method(*args, **kwargs)
167 if isinstance(ret, RemoteObject):
168 ret = remoteref(ret)
169 return ("OK", ret)
170 except:
171 ##traceback.print_exc(file=sys.__stderr__)
172 typ, val, tb = info = sys.exc_info()
173 sys.last_type, sys.last_value, sys.last_traceback = info
174 if isinstance(typ, type(Exception)):
175 # Class exceptions
176 mod = typ.__module__
177 name = typ.__name__
178 if issubclass(typ, Exception):
179 args = val.args
180 else:
181 args = (str(val),)
182 else:
183 # String exceptions
184 mod = None
185 name = typ
186 args = (str(val),)
187 tb = traceback.extract_tb(tb)
188 return ("EXCEPTION", (mod, name, args, tb))
189
190 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000191 self.debug("remotecall:", oid, methodname, args, kwargs)
Chui Tey5d2af632002-05-26 13:36:41 +0000192 seq = self.asynccall(oid, methodname, args, kwargs)
193 return self.asyncreturn(seq)
194
195 def asynccall(self, oid, methodname, args, kwargs):
196 request = ("call", (oid, methodname, args, kwargs))
197 seq = self.putrequest(request)
198 return seq
199
200 def asyncreturn(self, seq):
201 response = self.getresponse(seq)
202 return self.decoderesponse(response)
203
204 def decoderesponse(self, response):
205 how, what = response
206 if how == "OK":
207 return what
208 if how == "EXCEPTION":
209 mod, name, args, tb = what
210 self.traceback = tb
211 if mod:
212 try:
213 __import__(mod)
214 module = sys.modules[mod]
215 except ImportError:
216 pass
217 else:
218 try:
219 cls = getattr(module, name)
220 except AttributeError:
221 pass
222 else:
223 raise getattr(__import__(mod), name)(*args)
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000224 raise name, args
Chui Tey5d2af632002-05-26 13:36:41 +0000225 if how == "ERROR":
226 raise RuntimeError, what
227 raise SystemError, (how, what)
228
229 def mainloop(self):
230 try:
231 self.getresponse(None)
232 except EOFError:
233 pass
234
235 def getresponse(self, myseq):
236 response = self._getresponse(myseq)
237 if response is not None:
238 how, what = response
239 if how == "OK":
240 response = how, self._proxify(what)
241 return response
242
243 def _proxify(self, obj):
244 if isinstance(obj, RemoteProxy):
245 return RPCProxy(self, obj.oid)
246 if isinstance(obj, types.ListType):
247 return map(self._proxify, obj)
248 # XXX Check for other types -- not currently needed
249 return obj
250
251 def _getresponse(self, myseq):
252 if threading.currentThread() is self.mainthread:
253 # Main thread: does all reading of requests and responses
254 while 1:
255 response = self.pollresponse(myseq, None)
256 if response is not None:
257 return response
258 else:
259 # Auxiliary thread: wait for notification from main thread
260 cvar = threading.Condition(self.statelock)
261 self.statelock.acquire()
262 self.cvars[myseq] = cvar
263 while not self.responses.has_key(myseq):
264 cvar.wait()
265 response = self.responses[myseq]
266 del self.responses[myseq]
267 del self.cvars[myseq]
268 self.statelock.release()
269 return response
270
271 def putrequest(self, request):
272 seq = self.newseq()
273 self.putmessage((seq, request))
274 return seq
275
276 nextseq = 0
277
278 def newseq(self):
279 self.nextseq = seq = self.nextseq + 2
280 return seq
281
282 def putmessage(self, message):
283 try:
284 s = pickle.dumps(message)
285 except:
286 print >>sys.__stderr__, "Cannot pickle:", `message`
287 raise
288 s = struct.pack("<i", len(s)) + s
289 while len(s) > 0:
290 n = self.sock.send(s)
291 s = s[n:]
292
293 def ioready(self, wait=0.0):
294 r, w, x = select.select([self.sock.fileno()], [], [], wait)
295 return len(r)
296
297 buffer = ""
298 bufneed = 4
299 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
300
301 def pollpacket(self, wait=0.0):
302 self._stage0()
303 if len(self.buffer) < self.bufneed:
304 if not self.ioready(wait):
305 return None
306 try:
307 s = self.sock.recv(BUFSIZE)
308 except socket.error:
309 raise EOFError
310 if len(s) == 0:
311 raise EOFError
312 self.buffer += s
313 self._stage0()
314 return self._stage1()
315
316 def _stage0(self):
317 if self.bufstate == 0 and len(self.buffer) >= 4:
318 s = self.buffer[:4]
319 self.buffer = self.buffer[4:]
320 self.bufneed = struct.unpack("<i", s)[0]
321 self.bufstate = 1
322
323 def _stage1(self):
324 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
325 packet = self.buffer[:self.bufneed]
326 self.buffer = self.buffer[self.bufneed:]
327 self.bufneed = 4
328 self.bufstate = 0
329 return packet
330
331 def pollmessage(self, wait=0.0):
332 packet = self.pollpacket(wait)
333 if packet is None:
334 return None
335 try:
336 message = pickle.loads(packet)
337 except:
338 print >>sys.__stderr__, "-----------------------"
339 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
340 traceback.print_stack(file=sys.__stderr__)
341 print >>sys.__stderr__, "-----------------------"
342 raise
343 return message
344
345 def pollresponse(self, myseq, wait=0.0):
346 # Loop while there's no more buffered input or until specific response
347 while 1:
348 message = self.pollmessage(wait)
349 if message is None:
350 return None
351 wait = 0.0
352 seq, resq = message
353 if resq[0] == "call":
354 response = self.localcall(resq)
355 self.putmessage((seq, response))
356 continue
357 elif seq == myseq:
358 return resq
359 else:
360 self.statelock.acquire()
361 self.responses[seq] = resq
362 cv = self.cvars.get(seq)
363 if cv is not None:
364 cv.notify()
365 self.statelock.release()
366 continue
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000367
368#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000369
370class RemoteObject:
371 # Token mix-in class
372 pass
373
374def remoteref(obj):
375 oid = id(obj)
376 objecttable[oid] = obj
377 return RemoteProxy(oid)
378
379class RemoteProxy:
380
381 def __init__(self, oid):
382 self.oid = oid
383
384class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
385
386 debugging = 0
387
388 def __init__(self, sock, addr, svr):
389 svr.current_handler = self ## cgt xxx
390 SocketIO.__init__(self, sock)
391 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
392
Chui Tey5d2af632002-05-26 13:36:41 +0000393 def handle(self):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000394 "handle() method required by SocketServer"
Chui Tey5d2af632002-05-26 13:36:41 +0000395 self.mainloop()
396
397 def get_remote_proxy(self, oid):
398 return RPCProxy(self, oid)
399
400class RPCClient(SocketIO):
401
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000402 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000403
404 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000405 self.sock = socket.socket(family, type)
406 self.sock.bind(address)
407 self.sock.listen(1)
408
409 def accept(self):
410 newsock, address = self.sock.accept()
411 if address[0] == '127.0.0.1':
412 print>>sys.__stderr__, "Idle accepted connection from ", address
413 SocketIO.__init__(self, newsock)
414 else:
415 print>>sys.__stderr__, "Invalid host: ", address
416 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000417
418 def get_remote_proxy(self, oid):
419 return RPCProxy(self, oid)
420
421class RPCProxy:
422
423 __methods = None
424 __attributes = None
425
426 def __init__(self, sockio, oid):
427 self.sockio = sockio
428 self.oid = oid
429
430 def __getattr__(self, name):
431 if self.__methods is None:
432 self.__getmethods()
433 if self.__methods.get(name):
434 return MethodProxy(self.sockio, self.oid, name)
435 if self.__attributes is None:
436 self.__getattributes()
437 if not self.__attributes.has_key(name):
438 raise AttributeError, name
439 __getattr__.DebuggerStepThrough=1
440
441 def __getattributes(self):
442 self.__attributes = self.sockio.remotecall(self.oid,
443 "__attributes__", (), {})
444
445 def __getmethods(self):
446 self.__methods = self.sockio.remotecall(self.oid,
447 "__methods__", (), {})
448
449def _getmethods(obj, methods):
450 # Helper to get a list of methods from an object
451 # Adds names to dictionary argument 'methods'
452 for name in dir(obj):
453 attr = getattr(obj, name)
454 if callable(attr):
455 methods[name] = 1
456 if type(obj) == types.InstanceType:
457 _getmethods(obj.__class__, methods)
458 if type(obj) == types.ClassType:
459 for super in obj.__bases__:
460 _getmethods(super, methods)
461
462def _getattributes(obj, attributes):
463 for name in dir(obj):
464 attr = getattr(obj, name)
465 if not callable(attr):
466 attributes[name] = 1
467
468class MethodProxy:
469
470 def __init__(self, sockio, oid, name):
471 self.sockio = sockio
472 self.oid = oid
473 self.name = name
474
475 def __call__(self, *args, **kwargs):
476 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
477 return value
478
479#
480# Self Test
481#
482
483def testServer(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000484 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000485 class RemotePerson:
486 def __init__(self,name):
487 self.name = name
488 def greet(self, name):
489 print "(someone called greet)"
490 print "Hello %s, I am %s." % (name, self.name)
491 print
492 def getName(self):
493 print "(someone called getName)"
494 print
495 return self.name
496 def greet_this_guy(self, name):
497 print "(someone called greet_this_guy)"
498 print "About to greet %s ..." % name
499 remote_guy = self.server.current_handler.get_remote_proxy(name)
500 remote_guy.greet("Thomas Edison")
501 print "Done."
502 print
503
504 person = RemotePerson("Thomas Edison")
505 svr = RPCServer(addr)
506 svr.register('thomas', person)
507 person.server = svr # only required if callbacks are used
508
509 # svr.serve_forever()
510 svr.handle_request() # process once only
511
512def testClient(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000513 "demonstrates RPC Client"
514 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000515 import time
516 clt=RPCClient(addr)
517 thomas = clt.get_remote_proxy("thomas")
518 print "The remote person's name is ..."
519 print thomas.getName()
520 # print clt.remotecall("thomas", "getName", (), {})
521 print
522 time.sleep(1)
523 print "Getting remote thomas to say hi..."
524 thomas.greet("Alexander Bell")
525 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
526 print "Done."
527 print
528 time.sleep(2)
Chui Tey5d2af632002-05-26 13:36:41 +0000529 # demonstrates remote server calling local instance
530 class LocalPerson:
531 def __init__(self,name):
532 self.name = name
533 def greet(self, name):
534 print "You've greeted me!"
535 def getName(self):
536 return self.name
537 person = LocalPerson("Alexander Bell")
538 clt.register("alexander",person)
539 thomas.greet_this_guy("alexander")
540 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
541
542def test():
543 addr=("localhost",8833)
544 if len(sys.argv) == 2:
545 if sys.argv[1]=='-server':
546 testServer(addr)
547 return
548 testClient(addr)
549
550if __name__ == '__main__':
551 test()
552
553