blob: 5bb0e646f19215a7cbf52872842beaf1fbcce5a2 [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Chui Tey5d2af632002-05-26 13:36:41 +000031import socket
32import select
33import SocketServer
34import struct
35import cPickle as pickle
36import threading
37import traceback
38import copy_reg
39import types
40import marshal
41
42def unpickle_code(ms):
43 co = marshal.loads(ms)
44 assert isinstance(co, types.CodeType)
45 return co
46
47def pickle_code(co):
48 assert isinstance(co, types.CodeType)
49 ms = marshal.dumps(co)
50 return unpickle_code, (ms,)
51
52def unpickle_function(ms):
53 return ms
54
55def pickle_function(fn):
56 assert isinstance(fn, type.FunctionType)
57 return `fn`
58
59copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
60copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
61
62BUFSIZE = 8*1024
63
64class RPCServer(SocketServer.TCPServer):
65
66 def __init__(self, addr, handlerclass=None):
67 if handlerclass is None:
68 handlerclass = RPCHandler
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000069# XXX KBK 25Jun02 Not used in Idlefork.
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000070# self.objtable = objecttable
Chui Tey5d2af632002-05-26 13:36:41 +000071 SocketServer.TCPServer.__init__(self, addr, handlerclass)
72
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000073 def server_bind(self):
74 "Override TCPServer method, no bind() phase for connecting entity"
75 pass
Chui Tey5d2af632002-05-26 13:36:41 +000076
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000077 def server_activate(self):
78 """Override TCPServer method, connect() instead of listen()
79
80 Due to the reversed connection, self.server_address is actually the
81 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000082
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000083 """
84 self.socket.connect(self.server_address)
85
86 def get_request(self):
87 "Override TCPServer method, return already connected socket"
88 return self.socket, self.server_address
89
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000090
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000091# XXX The following two methods are not currently used in Idlefork.
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000092# def register(self, oid, object):
93# self.objtable[oid] = object
94
95# def unregister(self, oid):
96# try:
97# del self.objtable[oid]
98# except KeyError:
99# pass
Chui Tey5d2af632002-05-26 13:36:41 +0000100
101
102objecttable = {}
103
104class SocketIO:
105
106 debugging = 0
107
108 def __init__(self, sock, objtable=None, debugging=None):
109 self.mainthread = threading.currentThread()
110 if debugging is not None:
111 self.debugging = debugging
112 self.sock = sock
113 if objtable is None:
114 objtable = objecttable
115 self.objtable = objtable
116 self.statelock = threading.Lock()
117 self.responses = {}
118 self.cvars = {}
119
120 def close(self):
121 sock = self.sock
122 self.sock = None
123 if sock is not None:
124 sock.close()
125
126 def debug(self, *args):
127 if not self.debugging:
128 return
129 s = str(threading.currentThread().getName())
130 for a in args:
131 s = s + " " + str(a)
132 s = s + "\n"
133 sys.__stderr__.write(s)
134
135 def register(self, oid, object):
136 self.objtable[oid] = object
137
138 def unregister(self, oid):
139 try:
140 del self.objtable[oid]
141 except KeyError:
142 pass
143
144 def localcall(self, request):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000145 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000146 try:
147 how, (oid, methodname, args, kwargs) = request
148 except TypeError:
149 return ("ERROR", "Bad request format")
150 assert how == "call"
151 if not self.objtable.has_key(oid):
152 return ("ERROR", "Unknown object id: %s" % `oid`)
153 obj = self.objtable[oid]
154 if methodname == "__methods__":
155 methods = {}
156 _getmethods(obj, methods)
157 return ("OK", methods)
158 if methodname == "__attributes__":
159 attributes = {}
160 _getattributes(obj, attributes)
161 return ("OK", attributes)
162 if not hasattr(obj, methodname):
163 return ("ERROR", "Unsupported method name: %s" % `methodname`)
164 method = getattr(obj, methodname)
165 try:
166 ret = method(*args, **kwargs)
167 if isinstance(ret, RemoteObject):
168 ret = remoteref(ret)
169 return ("OK", ret)
170 except:
171 ##traceback.print_exc(file=sys.__stderr__)
172 typ, val, tb = info = sys.exc_info()
173 sys.last_type, sys.last_value, sys.last_traceback = info
174 if isinstance(typ, type(Exception)):
175 # Class exceptions
176 mod = typ.__module__
177 name = typ.__name__
178 if issubclass(typ, Exception):
179 args = val.args
180 else:
181 args = (str(val),)
182 else:
183 # String exceptions
184 mod = None
185 name = typ
186 args = (str(val),)
187 tb = traceback.extract_tb(tb)
188 return ("EXCEPTION", (mod, name, args, tb))
189
190 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000191 self.debug("remotecall:", oid, methodname, args, kwargs)
Chui Tey5d2af632002-05-26 13:36:41 +0000192 seq = self.asynccall(oid, methodname, args, kwargs)
193 return self.asyncreturn(seq)
194
195 def asynccall(self, oid, methodname, args, kwargs):
196 request = ("call", (oid, methodname, args, kwargs))
197 seq = self.putrequest(request)
198 return seq
199
200 def asyncreturn(self, seq):
201 response = self.getresponse(seq)
202 return self.decoderesponse(response)
203
204 def decoderesponse(self, response):
205 how, what = response
206 if how == "OK":
207 return what
208 if how == "EXCEPTION":
209 mod, name, args, tb = what
210 self.traceback = tb
211 if mod:
212 try:
213 __import__(mod)
214 module = sys.modules[mod]
215 except ImportError:
216 pass
217 else:
218 try:
219 cls = getattr(module, name)
220 except AttributeError:
221 pass
222 else:
223 raise getattr(__import__(mod), name)(*args)
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000224 raise name, args
Chui Tey5d2af632002-05-26 13:36:41 +0000225 if how == "ERROR":
226 raise RuntimeError, what
227 raise SystemError, (how, what)
228
229 def mainloop(self):
230 try:
231 self.getresponse(None)
232 except EOFError:
233 pass
234
235 def getresponse(self, myseq):
236 response = self._getresponse(myseq)
237 if response is not None:
238 how, what = response
239 if how == "OK":
240 response = how, self._proxify(what)
241 return response
242
243 def _proxify(self, obj):
244 if isinstance(obj, RemoteProxy):
245 return RPCProxy(self, obj.oid)
246 if isinstance(obj, types.ListType):
247 return map(self._proxify, obj)
248 # XXX Check for other types -- not currently needed
249 return obj
250
251 def _getresponse(self, myseq):
252 if threading.currentThread() is self.mainthread:
253 # Main thread: does all reading of requests and responses
254 while 1:
255 response = self.pollresponse(myseq, None)
256 if response is not None:
257 return response
258 else:
259 # Auxiliary thread: wait for notification from main thread
260 cvar = threading.Condition(self.statelock)
261 self.statelock.acquire()
262 self.cvars[myseq] = cvar
263 while not self.responses.has_key(myseq):
264 cvar.wait()
265 response = self.responses[myseq]
266 del self.responses[myseq]
267 del self.cvars[myseq]
268 self.statelock.release()
269 return response
270
271 def putrequest(self, request):
272 seq = self.newseq()
273 self.putmessage((seq, request))
274 return seq
275
276 nextseq = 0
277
278 def newseq(self):
279 self.nextseq = seq = self.nextseq + 2
280 return seq
281
282 def putmessage(self, message):
283 try:
284 s = pickle.dumps(message)
285 except:
286 print >>sys.__stderr__, "Cannot pickle:", `message`
287 raise
288 s = struct.pack("<i", len(s)) + s
289 while len(s) > 0:
290 n = self.sock.send(s)
291 s = s[n:]
292
293 def ioready(self, wait=0.0):
294 r, w, x = select.select([self.sock.fileno()], [], [], wait)
295 return len(r)
296
297 buffer = ""
298 bufneed = 4
299 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
300
301 def pollpacket(self, wait=0.0):
302 self._stage0()
303 if len(self.buffer) < self.bufneed:
304 if not self.ioready(wait):
305 return None
306 try:
307 s = self.sock.recv(BUFSIZE)
308 except socket.error:
309 raise EOFError
310 if len(s) == 0:
311 raise EOFError
312 self.buffer += s
313 self._stage0()
314 return self._stage1()
315
316 def _stage0(self):
317 if self.bufstate == 0 and len(self.buffer) >= 4:
318 s = self.buffer[:4]
319 self.buffer = self.buffer[4:]
320 self.bufneed = struct.unpack("<i", s)[0]
321 self.bufstate = 1
322
323 def _stage1(self):
324 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
325 packet = self.buffer[:self.bufneed]
326 self.buffer = self.buffer[self.bufneed:]
327 self.bufneed = 4
328 self.bufstate = 0
329 return packet
330
331 def pollmessage(self, wait=0.0):
332 packet = self.pollpacket(wait)
333 if packet is None:
334 return None
335 try:
336 message = pickle.loads(packet)
337 except:
338 print >>sys.__stderr__, "-----------------------"
339 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
340 traceback.print_stack(file=sys.__stderr__)
341 print >>sys.__stderr__, "-----------------------"
342 raise
343 return message
344
345 def pollresponse(self, myseq, wait=0.0):
346 # Loop while there's no more buffered input or until specific response
347 while 1:
348 message = self.pollmessage(wait)
349 if message is None:
350 return None
351 wait = 0.0
352 seq, resq = message
353 if resq[0] == "call":
354 response = self.localcall(resq)
355 self.putmessage((seq, response))
356 continue
357 elif seq == myseq:
358 return resq
359 else:
360 self.statelock.acquire()
361 self.responses[seq] = resq
362 cv = self.cvars.get(seq)
363 if cv is not None:
364 cv.notify()
365 self.statelock.release()
366 continue
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000367
368#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000369
370class RemoteObject:
371 # Token mix-in class
372 pass
373
374def remoteref(obj):
375 oid = id(obj)
376 objecttable[oid] = obj
377 return RemoteProxy(oid)
378
379class RemoteProxy:
380
381 def __init__(self, oid):
382 self.oid = oid
383
384class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
385
386 debugging = 0
387
388 def __init__(self, sock, addr, svr):
389 svr.current_handler = self ## cgt xxx
390 SocketIO.__init__(self, sock)
391 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
392
Chui Tey5d2af632002-05-26 13:36:41 +0000393 def handle(self):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000394 "handle() method required by SocketServer"
Chui Tey5d2af632002-05-26 13:36:41 +0000395 self.mainloop()
396
397 def get_remote_proxy(self, oid):
398 return RPCProxy(self, oid)
399
400class RPCClient(SocketIO):
401
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000402 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000403
404 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000405 self.sock = socket.socket(family, type)
Kurt B. Kaiser8dcdb772002-08-05 03:52:10 +0000406 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000407 self.sock.bind(address)
408 self.sock.listen(1)
409
410 def accept(self):
411 newsock, address = self.sock.accept()
412 if address[0] == '127.0.0.1':
413 print>>sys.__stderr__, "Idle accepted connection from ", address
414 SocketIO.__init__(self, newsock)
415 else:
416 print>>sys.__stderr__, "Invalid host: ", address
417 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000418
419 def get_remote_proxy(self, oid):
420 return RPCProxy(self, oid)
421
422class RPCProxy:
423
424 __methods = None
425 __attributes = None
426
427 def __init__(self, sockio, oid):
428 self.sockio = sockio
429 self.oid = oid
430
431 def __getattr__(self, name):
432 if self.__methods is None:
433 self.__getmethods()
434 if self.__methods.get(name):
435 return MethodProxy(self.sockio, self.oid, name)
436 if self.__attributes is None:
437 self.__getattributes()
438 if not self.__attributes.has_key(name):
439 raise AttributeError, name
440 __getattr__.DebuggerStepThrough=1
441
442 def __getattributes(self):
443 self.__attributes = self.sockio.remotecall(self.oid,
444 "__attributes__", (), {})
445
446 def __getmethods(self):
447 self.__methods = self.sockio.remotecall(self.oid,
448 "__methods__", (), {})
449
450def _getmethods(obj, methods):
451 # Helper to get a list of methods from an object
452 # Adds names to dictionary argument 'methods'
453 for name in dir(obj):
454 attr = getattr(obj, name)
455 if callable(attr):
456 methods[name] = 1
457 if type(obj) == types.InstanceType:
458 _getmethods(obj.__class__, methods)
459 if type(obj) == types.ClassType:
460 for super in obj.__bases__:
461 _getmethods(super, methods)
462
463def _getattributes(obj, attributes):
464 for name in dir(obj):
465 attr = getattr(obj, name)
466 if not callable(attr):
467 attributes[name] = 1
468
469class MethodProxy:
470
471 def __init__(self, sockio, oid, name):
472 self.sockio = sockio
473 self.oid = oid
474 self.name = name
475
476 def __call__(self, *args, **kwargs):
477 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
478 return value
479
480#
481# Self Test
482#
483
484def testServer(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000485 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000486 class RemotePerson:
487 def __init__(self,name):
488 self.name = name
489 def greet(self, name):
490 print "(someone called greet)"
491 print "Hello %s, I am %s." % (name, self.name)
492 print
493 def getName(self):
494 print "(someone called getName)"
495 print
496 return self.name
497 def greet_this_guy(self, name):
498 print "(someone called greet_this_guy)"
499 print "About to greet %s ..." % name
500 remote_guy = self.server.current_handler.get_remote_proxy(name)
501 remote_guy.greet("Thomas Edison")
502 print "Done."
503 print
504
505 person = RemotePerson("Thomas Edison")
506 svr = RPCServer(addr)
507 svr.register('thomas', person)
508 person.server = svr # only required if callbacks are used
509
510 # svr.serve_forever()
511 svr.handle_request() # process once only
512
513def testClient(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000514 "demonstrates RPC Client"
515 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000516 import time
517 clt=RPCClient(addr)
518 thomas = clt.get_remote_proxy("thomas")
519 print "The remote person's name is ..."
520 print thomas.getName()
521 # print clt.remotecall("thomas", "getName", (), {})
522 print
523 time.sleep(1)
524 print "Getting remote thomas to say hi..."
525 thomas.greet("Alexander Bell")
526 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
527 print "Done."
528 print
529 time.sleep(2)
Chui Tey5d2af632002-05-26 13:36:41 +0000530 # demonstrates remote server calling local instance
531 class LocalPerson:
532 def __init__(self,name):
533 self.name = name
534 def greet(self, name):
535 print "You've greeted me!"
536 def getName(self):
537 return self.name
538 person = LocalPerson("Alexander Bell")
539 clt.register("alexander",person)
540 thomas.greet_this_guy("alexander")
541 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
542
543def test():
544 addr=("localhost",8833)
545 if len(sys.argv) == 2:
546 if sys.argv[1]=='-server':
547 testServer(addr)
548 return
549 testClient(addr)
550
551if __name__ == '__main__':
552 test()
553
554