blob: 60a6335ea11cfc3b039c5aef821efd68c732efe4 [file] [log] [blame]
Chui Tey5d2af632002-05-26 13:36:41 +00001# ASCII-art documentation
2#
3# +---------------------------------+ +----------+
4# | SocketServer.BaseRequestHandler | | SocketIO |
5# +---------------------------------+ +----------+
6# ^ ^ ^
7# | | |
8# | + -------------------+ |
9# | | |
10# +-------------------------+ +-----------------+
11# | RPCHandler | | RPCClient |
12# |-------------------------| |-----------------|
13# | register() | | remotecall() |
14# | unregister() | | register() |
15# | | | unregister() |
16# | | | get_remote_proxy|
17# +-------------------------+ +-----------------+
18#
19import sys
20import socket
21import select
22import SocketServer
23import struct
24import cPickle as pickle
25import threading
26import traceback
27import copy_reg
28import types
29import marshal
30
31def unpickle_code(ms):
32 co = marshal.loads(ms)
33 assert isinstance(co, types.CodeType)
34 return co
35
36def pickle_code(co):
37 assert isinstance(co, types.CodeType)
38 ms = marshal.dumps(co)
39 return unpickle_code, (ms,)
40
41def unpickle_function(ms):
42 return ms
43
44def pickle_function(fn):
45 assert isinstance(fn, type.FunctionType)
46 return `fn`
47
48copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
49copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
50
51BUFSIZE = 8*1024
52
53class RPCServer(SocketServer.TCPServer):
54
55 def __init__(self, addr, handlerclass=None):
56 if handlerclass is None:
57 handlerclass = RPCHandler
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000058# XXX KBK 25Jun02 Not used in Idlefork, see register/unregister note below.
59# self.objtable = objecttable
Chui Tey5d2af632002-05-26 13:36:41 +000060 SocketServer.TCPServer.__init__(self, addr, handlerclass)
61
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000062 # XXX KBK 25Jun02 Following method is not used (yet)
63# def verify_request(self, request, client_address):
64# host, port = client_address
65# if host != "127.0.0.1":
66# print "Disallowed host:", host
67# return 0
68# else:
69# return 1
Chui Tey5d2af632002-05-26 13:36:41 +000070
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000071# XXX KBK 25Jun02 The handlerclass is expected to provide register/unregister
72# methods. In Idle, RPCServer is instantiated with
73# handlerclass MyHandler, which in turn inherits the
74# register/unregister methods from the mix-in class SocketIO.
75# It is true that this is asymmetric with the RPCClient's use
76# of register/unregister, but I guess that's how a SocketServer
77# is supposed to work.
Chui Tey5d2af632002-05-26 13:36:41 +000078
Kurt B. Kaiserffd3a422002-06-26 02:32:09 +000079# Exactly how this gets set up is convoluted. When the
80# TCPServer is instantiated, it creates an instance of
81# run.MyHandler and calls its handle() method. handle()
82# instantiates a run.Executive, passing it a reference to the
83# MyHandler object. That reference is saved as an attribute of
84# the Executive instance. The Executive methods have access to
85# the reference and can pass it on to entities that they
86# command (e.g. RemoteDebugger.Debugger.start_debugger()). The
87# latter, in turn, can call MyHandler(SocketIO)
88# register/unregister methods via the reference to register and
89# unregister themselves. Whew.
90
91 # The following two methods are not currently used in Idlefork.
92# def register(self, oid, object):
93# self.objtable[oid] = object
94
95# def unregister(self, oid):
96# try:
97# del self.objtable[oid]
98# except KeyError:
99# pass
Chui Tey5d2af632002-05-26 13:36:41 +0000100
101
102objecttable = {}
103
104class SocketIO:
105
106 debugging = 0
107
108 def __init__(self, sock, objtable=None, debugging=None):
109 self.mainthread = threading.currentThread()
110 if debugging is not None:
111 self.debugging = debugging
112 self.sock = sock
113 if objtable is None:
114 objtable = objecttable
115 self.objtable = objtable
116 self.statelock = threading.Lock()
117 self.responses = {}
118 self.cvars = {}
119
120 def close(self):
121 sock = self.sock
122 self.sock = None
123 if sock is not None:
124 sock.close()
125
126 def debug(self, *args):
127 if not self.debugging:
128 return
129 s = str(threading.currentThread().getName())
130 for a in args:
131 s = s + " " + str(a)
132 s = s + "\n"
133 sys.__stderr__.write(s)
134
135 def register(self, oid, object):
136 self.objtable[oid] = object
137
138 def unregister(self, oid):
139 try:
140 del self.objtable[oid]
141 except KeyError:
142 pass
143
144 def localcall(self, request):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000145 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000146 try:
147 how, (oid, methodname, args, kwargs) = request
148 except TypeError:
149 return ("ERROR", "Bad request format")
150 assert how == "call"
151 if not self.objtable.has_key(oid):
152 return ("ERROR", "Unknown object id: %s" % `oid`)
153 obj = self.objtable[oid]
154 if methodname == "__methods__":
155 methods = {}
156 _getmethods(obj, methods)
157 return ("OK", methods)
158 if methodname == "__attributes__":
159 attributes = {}
160 _getattributes(obj, attributes)
161 return ("OK", attributes)
162 if not hasattr(obj, methodname):
163 return ("ERROR", "Unsupported method name: %s" % `methodname`)
164 method = getattr(obj, methodname)
165 try:
166 ret = method(*args, **kwargs)
167 if isinstance(ret, RemoteObject):
168 ret = remoteref(ret)
169 return ("OK", ret)
170 except:
171 ##traceback.print_exc(file=sys.__stderr__)
172 typ, val, tb = info = sys.exc_info()
173 sys.last_type, sys.last_value, sys.last_traceback = info
174 if isinstance(typ, type(Exception)):
175 # Class exceptions
176 mod = typ.__module__
177 name = typ.__name__
178 if issubclass(typ, Exception):
179 args = val.args
180 else:
181 args = (str(val),)
182 else:
183 # String exceptions
184 mod = None
185 name = typ
186 args = (str(val),)
187 tb = traceback.extract_tb(tb)
188 return ("EXCEPTION", (mod, name, args, tb))
189
190 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000191 self.debug("remotecall:", oid, methodname, args, kwargs)
Chui Tey5d2af632002-05-26 13:36:41 +0000192 seq = self.asynccall(oid, methodname, args, kwargs)
193 return self.asyncreturn(seq)
194
195 def asynccall(self, oid, methodname, args, kwargs):
196 request = ("call", (oid, methodname, args, kwargs))
197 seq = self.putrequest(request)
198 return seq
199
200 def asyncreturn(self, seq):
201 response = self.getresponse(seq)
202 return self.decoderesponse(response)
203
204 def decoderesponse(self, response):
205 how, what = response
206 if how == "OK":
207 return what
208 if how == "EXCEPTION":
209 mod, name, args, tb = what
210 self.traceback = tb
211 if mod:
212 try:
213 __import__(mod)
214 module = sys.modules[mod]
215 except ImportError:
216 pass
217 else:
218 try:
219 cls = getattr(module, name)
220 except AttributeError:
221 pass
222 else:
223 raise getattr(__import__(mod), name)(*args)
Kurt B. Kaiser0e3a5772002-06-16 03:32:24 +0000224 raise name, args
Chui Tey5d2af632002-05-26 13:36:41 +0000225 if how == "ERROR":
226 raise RuntimeError, what
227 raise SystemError, (how, what)
228
229 def mainloop(self):
230 try:
231 self.getresponse(None)
232 except EOFError:
233 pass
234
235 def getresponse(self, myseq):
236 response = self._getresponse(myseq)
237 if response is not None:
238 how, what = response
239 if how == "OK":
240 response = how, self._proxify(what)
241 return response
242
243 def _proxify(self, obj):
244 if isinstance(obj, RemoteProxy):
245 return RPCProxy(self, obj.oid)
246 if isinstance(obj, types.ListType):
247 return map(self._proxify, obj)
248 # XXX Check for other types -- not currently needed
249 return obj
250
251 def _getresponse(self, myseq):
252 if threading.currentThread() is self.mainthread:
253 # Main thread: does all reading of requests and responses
254 while 1:
255 response = self.pollresponse(myseq, None)
256 if response is not None:
257 return response
258 else:
259 # Auxiliary thread: wait for notification from main thread
260 cvar = threading.Condition(self.statelock)
261 self.statelock.acquire()
262 self.cvars[myseq] = cvar
263 while not self.responses.has_key(myseq):
264 cvar.wait()
265 response = self.responses[myseq]
266 del self.responses[myseq]
267 del self.cvars[myseq]
268 self.statelock.release()
269 return response
270
271 def putrequest(self, request):
272 seq = self.newseq()
273 self.putmessage((seq, request))
274 return seq
275
276 nextseq = 0
277
278 def newseq(self):
279 self.nextseq = seq = self.nextseq + 2
280 return seq
281
282 def putmessage(self, message):
283 try:
284 s = pickle.dumps(message)
285 except:
286 print >>sys.__stderr__, "Cannot pickle:", `message`
287 raise
288 s = struct.pack("<i", len(s)) + s
289 while len(s) > 0:
290 n = self.sock.send(s)
291 s = s[n:]
292
293 def ioready(self, wait=0.0):
294 r, w, x = select.select([self.sock.fileno()], [], [], wait)
295 return len(r)
296
297 buffer = ""
298 bufneed = 4
299 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
300
301 def pollpacket(self, wait=0.0):
302 self._stage0()
303 if len(self.buffer) < self.bufneed:
304 if not self.ioready(wait):
305 return None
306 try:
307 s = self.sock.recv(BUFSIZE)
308 except socket.error:
309 raise EOFError
310 if len(s) == 0:
311 raise EOFError
312 self.buffer += s
313 self._stage0()
314 return self._stage1()
315
316 def _stage0(self):
317 if self.bufstate == 0 and len(self.buffer) >= 4:
318 s = self.buffer[:4]
319 self.buffer = self.buffer[4:]
320 self.bufneed = struct.unpack("<i", s)[0]
321 self.bufstate = 1
322
323 def _stage1(self):
324 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
325 packet = self.buffer[:self.bufneed]
326 self.buffer = self.buffer[self.bufneed:]
327 self.bufneed = 4
328 self.bufstate = 0
329 return packet
330
331 def pollmessage(self, wait=0.0):
332 packet = self.pollpacket(wait)
333 if packet is None:
334 return None
335 try:
336 message = pickle.loads(packet)
337 except:
338 print >>sys.__stderr__, "-----------------------"
339 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
340 traceback.print_stack(file=sys.__stderr__)
341 print >>sys.__stderr__, "-----------------------"
342 raise
343 return message
344
345 def pollresponse(self, myseq, wait=0.0):
346 # Loop while there's no more buffered input or until specific response
347 while 1:
348 message = self.pollmessage(wait)
349 if message is None:
350 return None
351 wait = 0.0
352 seq, resq = message
353 if resq[0] == "call":
354 response = self.localcall(resq)
355 self.putmessage((seq, response))
356 continue
357 elif seq == myseq:
358 return resq
359 else:
360 self.statelock.acquire()
361 self.responses[seq] = resq
362 cv = self.cvars.get(seq)
363 if cv is not None:
364 cv.notify()
365 self.statelock.release()
366 continue
367
368class RemoteObject:
369 # Token mix-in class
370 pass
371
372def remoteref(obj):
373 oid = id(obj)
374 objecttable[oid] = obj
375 return RemoteProxy(oid)
376
377class RemoteProxy:
378
379 def __init__(self, oid):
380 self.oid = oid
381
382class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
383
384 debugging = 0
385
386 def __init__(self, sock, addr, svr):
387 svr.current_handler = self ## cgt xxx
388 SocketIO.__init__(self, sock)
389 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
390
391 def setup(self):
392 SocketServer.BaseRequestHandler.setup(self)
393 print >>sys.__stderr__, "Connection from", self.client_address
394
395 def finish(self):
396 print >>sys.__stderr__, "End connection from", self.client_address
397 SocketServer.BaseRequestHandler.finish(self)
398
399 def handle(self):
400 self.mainloop()
401
402 def get_remote_proxy(self, oid):
403 return RPCProxy(self, oid)
404
405class RPCClient(SocketIO):
406
407 nextseq = 1 # Requests coming from the client are odd
408
409 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
410 sock = socket.socket(family, type)
411 sock.connect(address)
412 SocketIO.__init__(self, sock)
413
414 def get_remote_proxy(self, oid):
415 return RPCProxy(self, oid)
416
417class RPCProxy:
418
419 __methods = None
420 __attributes = None
421
422 def __init__(self, sockio, oid):
423 self.sockio = sockio
424 self.oid = oid
425
426 def __getattr__(self, name):
427 if self.__methods is None:
428 self.__getmethods()
429 if self.__methods.get(name):
430 return MethodProxy(self.sockio, self.oid, name)
431 if self.__attributes is None:
432 self.__getattributes()
433 if not self.__attributes.has_key(name):
434 raise AttributeError, name
435 __getattr__.DebuggerStepThrough=1
436
437 def __getattributes(self):
438 self.__attributes = self.sockio.remotecall(self.oid,
439 "__attributes__", (), {})
440
441 def __getmethods(self):
442 self.__methods = self.sockio.remotecall(self.oid,
443 "__methods__", (), {})
444
445def _getmethods(obj, methods):
446 # Helper to get a list of methods from an object
447 # Adds names to dictionary argument 'methods'
448 for name in dir(obj):
449 attr = getattr(obj, name)
450 if callable(attr):
451 methods[name] = 1
452 if type(obj) == types.InstanceType:
453 _getmethods(obj.__class__, methods)
454 if type(obj) == types.ClassType:
455 for super in obj.__bases__:
456 _getmethods(super, methods)
457
458def _getattributes(obj, attributes):
459 for name in dir(obj):
460 attr = getattr(obj, name)
461 if not callable(attr):
462 attributes[name] = 1
463
464class MethodProxy:
465
466 def __init__(self, sockio, oid, name):
467 self.sockio = sockio
468 self.oid = oid
469 self.name = name
470
471 def __call__(self, *args, **kwargs):
472 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
473 return value
474
475#
476# Self Test
477#
478
479def testServer(addr):
480 class RemotePerson:
481 def __init__(self,name):
482 self.name = name
483 def greet(self, name):
484 print "(someone called greet)"
485 print "Hello %s, I am %s." % (name, self.name)
486 print
487 def getName(self):
488 print "(someone called getName)"
489 print
490 return self.name
491 def greet_this_guy(self, name):
492 print "(someone called greet_this_guy)"
493 print "About to greet %s ..." % name
494 remote_guy = self.server.current_handler.get_remote_proxy(name)
495 remote_guy.greet("Thomas Edison")
496 print "Done."
497 print
498
499 person = RemotePerson("Thomas Edison")
500 svr = RPCServer(addr)
501 svr.register('thomas', person)
502 person.server = svr # only required if callbacks are used
503
504 # svr.serve_forever()
505 svr.handle_request() # process once only
506
507def testClient(addr):
508
509 #
510 # demonstrates RPC Client
511 #
512 import time
513 clt=RPCClient(addr)
514 thomas = clt.get_remote_proxy("thomas")
515 print "The remote person's name is ..."
516 print thomas.getName()
517 # print clt.remotecall("thomas", "getName", (), {})
518 print
519 time.sleep(1)
520 print "Getting remote thomas to say hi..."
521 thomas.greet("Alexander Bell")
522 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
523 print "Done."
524 print
525 time.sleep(2)
526
527 # demonstrates remote server calling local instance
528 class LocalPerson:
529 def __init__(self,name):
530 self.name = name
531 def greet(self, name):
532 print "You've greeted me!"
533 def getName(self):
534 return self.name
535 person = LocalPerson("Alexander Bell")
536 clt.register("alexander",person)
537 thomas.greet_this_guy("alexander")
538 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
539
540def test():
541 addr=("localhost",8833)
542 if len(sys.argv) == 2:
543 if sys.argv[1]=='-server':
544 testServer(addr)
545 return
546 testClient(addr)
547
548if __name__ == '__main__':
549 test()
550
551