blob: 4c3ef3e2ddf6e658d94a273c666f2f18b4ab46e0 [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Kurt B. Kaisera00050f2003-05-08 20:26:55 +000031import os
Chui Tey5d2af632002-05-26 13:36:41 +000032import socket
33import select
34import SocketServer
35import struct
36import cPickle as pickle
37import threading
Kurt B. Kaisera00050f2003-05-08 20:26:55 +000038import Queue
Chui Tey5d2af632002-05-26 13:36:41 +000039import traceback
40import copy_reg
41import types
42import marshal
43
Kurt B. Kaisera00050f2003-05-08 20:26:55 +000044import interrupt
45
Chui Tey5d2af632002-05-26 13:36:41 +000046def unpickle_code(ms):
47 co = marshal.loads(ms)
48 assert isinstance(co, types.CodeType)
49 return co
50
51def pickle_code(co):
52 assert isinstance(co, types.CodeType)
53 ms = marshal.dumps(co)
54 return unpickle_code, (ms,)
55
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000056# XXX KBK 24Aug02 function pickling capability not used in Idle
57# def unpickle_function(ms):
58# return ms
Chui Tey5d2af632002-05-26 13:36:41 +000059
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000060# def pickle_function(fn):
61# assert isinstance(fn, type.FunctionType)
62# return `fn`
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000063
Chui Tey5d2af632002-05-26 13:36:41 +000064copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000065# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
Chui Tey5d2af632002-05-26 13:36:41 +000066
67BUFSIZE = 8*1024
68
69class RPCServer(SocketServer.TCPServer):
70
71 def __init__(self, addr, handlerclass=None):
72 if handlerclass is None:
73 handlerclass = RPCHandler
Chui Tey5d2af632002-05-26 13:36:41 +000074 SocketServer.TCPServer.__init__(self, addr, handlerclass)
75
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000076 def server_bind(self):
77 "Override TCPServer method, no bind() phase for connecting entity"
78 pass
Chui Tey5d2af632002-05-26 13:36:41 +000079
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000080 def server_activate(self):
81 """Override TCPServer method, connect() instead of listen()
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000082
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000083 Due to the reversed connection, self.server_address is actually the
84 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000085
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000086 """
87 self.socket.connect(self.server_address)
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000088
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000089 def get_request(self):
90 "Override TCPServer method, return already connected socket"
91 return self.socket, self.server_address
Chui Tey5d2af632002-05-26 13:36:41 +000092
Kurt B. Kaiser003091c2003-02-17 18:57:16 +000093 def handle_error(self, request, client_address):
Kurt B. Kaisere51529d2003-03-22 19:15:58 +000094 """Override TCPServer method
95
96 Error message goes to __stderr__. No error message if exiting
97 normally or socket raised EOF. Other exceptions not handled in
98 server code will cause os._exit.
99
100 """
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000101 try:
102 raise
103 except SystemExit:
104 raise
Kurt B. Kaisere51529d2003-03-22 19:15:58 +0000105 except:
Kurt B. Kaiser05293772003-03-22 20:11:14 +0000106 erf = sys.__stderr__
107 print>>erf, '\n' + '-'*40
Kurt B. Kaisere51529d2003-03-22 19:15:58 +0000108 print>>erf, 'Unhandled server exception!'
109 print>>erf, 'Thread: %s' % threading.currentThread().getName()
Kurt B. Kaiser05293772003-03-22 20:11:14 +0000110 print>>erf, 'Client Address: ', client_address
Kurt B. Kaisere51529d2003-03-22 19:15:58 +0000111 print>>erf, 'Request: ', repr(request)
112 traceback.print_exc(file=erf)
113 print>>erf, '\n*** Unrecoverable, server exiting!'
114 print>>erf, '-'*40
Kurt B. Kaiser05293772003-03-22 20:11:14 +0000115 os._exit(0)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000116
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000117#----------------- end class RPCServer --------------------
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000118
Chui Tey5d2af632002-05-26 13:36:41 +0000119objecttable = {}
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000120request_queue = Queue.Queue(0)
121response_queue = Queue.Queue(0)
122
Chui Tey5d2af632002-05-26 13:36:41 +0000123
124class SocketIO:
125
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000126 nextseq = 0
127
Chui Tey5d2af632002-05-26 13:36:41 +0000128 def __init__(self, sock, objtable=None, debugging=None):
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000129 self.sockthread = threading.currentThread()
Chui Tey5d2af632002-05-26 13:36:41 +0000130 if debugging is not None:
131 self.debugging = debugging
132 self.sock = sock
133 if objtable is None:
134 objtable = objecttable
135 self.objtable = objtable
Chui Tey5d2af632002-05-26 13:36:41 +0000136 self.responses = {}
137 self.cvars = {}
138
139 def close(self):
140 sock = self.sock
141 self.sock = None
142 if sock is not None:
143 sock.close()
144
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000145 def exithook(self):
146 "override for specific exit action"
147 os._exit()
148
Chui Tey5d2af632002-05-26 13:36:41 +0000149 def debug(self, *args):
150 if not self.debugging:
151 return
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000152 s = self.location + " " + str(threading.currentThread().getName())
Chui Tey5d2af632002-05-26 13:36:41 +0000153 for a in args:
154 s = s + " " + str(a)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000155 print>>sys.__stderr__, s
Chui Tey5d2af632002-05-26 13:36:41 +0000156
157 def register(self, oid, object):
158 self.objtable[oid] = object
159
160 def unregister(self, oid):
161 try:
162 del self.objtable[oid]
163 except KeyError:
164 pass
165
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000166 def localcall(self, seq, request):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000167 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000168 try:
169 how, (oid, methodname, args, kwargs) = request
170 except TypeError:
171 return ("ERROR", "Bad request format")
Chui Tey5d2af632002-05-26 13:36:41 +0000172 if not self.objtable.has_key(oid):
173 return ("ERROR", "Unknown object id: %s" % `oid`)
174 obj = self.objtable[oid]
175 if methodname == "__methods__":
176 methods = {}
177 _getmethods(obj, methods)
178 return ("OK", methods)
179 if methodname == "__attributes__":
180 attributes = {}
181 _getattributes(obj, attributes)
182 return ("OK", attributes)
183 if not hasattr(obj, methodname):
184 return ("ERROR", "Unsupported method name: %s" % `methodname`)
185 method = getattr(obj, methodname)
186 try:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000187 if how == 'CALL':
188 ret = method(*args, **kwargs)
189 if isinstance(ret, RemoteObject):
190 ret = remoteref(ret)
191 return ("OK", ret)
192 elif how == 'QUEUE':
193 request_queue.put((seq, (method, args, kwargs)))
194 return("QUEUED", None)
195 else:
196 return ("ERROR", "Unsupported message type: %s" % how)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000197 except SystemExit:
198 raise
Kurt B. Kaiser9ac783d2003-03-10 20:42:24 +0000199 except socket.error:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000200 raise
Chui Tey5d2af632002-05-26 13:36:41 +0000201 except:
Kurt B. Kaiser8cd0def2003-01-31 05:06:43 +0000202 self.debug("localcall:EXCEPTION")
Kurt B. Kaiser86bc4642003-02-27 23:04:17 +0000203 traceback.print_exc(file=sys.__stderr__)
Kurt B. Kaiser8cd0def2003-01-31 05:06:43 +0000204 return ("EXCEPTION", None)
205
Chui Tey5d2af632002-05-26 13:36:41 +0000206 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000207 self.debug("remotecall:asynccall: ", oid, methodname)
Chui Tey5d2af632002-05-26 13:36:41 +0000208 seq = self.asynccall(oid, methodname, args, kwargs)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000209 return self.asyncreturn(seq)
Chui Tey5d2af632002-05-26 13:36:41 +0000210
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000211 def remotequeue(self, oid, methodname, args, kwargs):
212 self.debug("remotequeue:asyncqueue: ", oid, methodname)
213 seq = self.asyncqueue(oid, methodname, args, kwargs)
214 return self.asyncreturn(seq)
215
Chui Tey5d2af632002-05-26 13:36:41 +0000216 def asynccall(self, oid, methodname, args, kwargs):
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000217 request = ("CALL", (oid, methodname, args, kwargs))
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000218 seq = self.newseq()
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000219 if threading.currentThread() != self.sockthread:
220 cvar = threading.Condition()
221 self.cvars[seq] = cvar
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000222 self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
223 self.putmessage((seq, request))
Chui Tey5d2af632002-05-26 13:36:41 +0000224 return seq
225
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000226 def asyncqueue(self, oid, methodname, args, kwargs):
227 request = ("QUEUE", (oid, methodname, args, kwargs))
228 seq = self.newseq()
229 if threading.currentThread() != self.sockthread:
230 cvar = threading.Condition()
231 self.cvars[seq] = cvar
232 self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
233 self.putmessage((seq, request))
234 return seq
235
Chui Tey5d2af632002-05-26 13:36:41 +0000236 def asyncreturn(self, seq):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000237 self.debug("asyncreturn:%d:call getresponse(): " % seq)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000238 response = self.getresponse(seq, wait=0.05)
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000239 self.debug(("asyncreturn:%d:response: " % seq), response)
Chui Tey5d2af632002-05-26 13:36:41 +0000240 return self.decoderesponse(response)
241
242 def decoderesponse(self, response):
243 how, what = response
244 if how == "OK":
245 return what
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000246 if how == "QUEUED":
247 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000248 if how == "EXCEPTION":
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000249 self.debug("decoderesponse: EXCEPTION")
250 return None
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000251 if how == "EOF":
252 self.debug("decoderesponse: EOF")
253 self.decode_interrupthook()
254 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000255 if how == "ERROR":
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000256 self.debug("decoderesponse: Internal ERROR:", what)
Chui Tey5d2af632002-05-26 13:36:41 +0000257 raise RuntimeError, what
258 raise SystemError, (how, what)
259
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000260 def decode_interrupthook(self):
261 ""
262 raise EOFError
263
Chui Tey5d2af632002-05-26 13:36:41 +0000264 def mainloop(self):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000265 """Listen on socket until I/O not ready or EOF
266
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000267 pollresponse() will loop looking for seq number None, which
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000268 never comes, and exit on EOFError.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000269
270 """
Chui Tey5d2af632002-05-26 13:36:41 +0000271 try:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000272 self.getresponse(myseq=None, wait=0.05)
Chui Tey5d2af632002-05-26 13:36:41 +0000273 except EOFError:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000274 self.debug("mainloop:return")
275 return
Chui Tey5d2af632002-05-26 13:36:41 +0000276
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000277 def getresponse(self, myseq, wait):
278 response = self._getresponse(myseq, wait)
Chui Tey5d2af632002-05-26 13:36:41 +0000279 if response is not None:
280 how, what = response
281 if how == "OK":
282 response = how, self._proxify(what)
283 return response
284
285 def _proxify(self, obj):
286 if isinstance(obj, RemoteProxy):
287 return RPCProxy(self, obj.oid)
288 if isinstance(obj, types.ListType):
289 return map(self._proxify, obj)
290 # XXX Check for other types -- not currently needed
291 return obj
292
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000293 def _getresponse(self, myseq, wait):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000294 self.debug("_getresponse:myseq:", myseq)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000295 if threading.currentThread() is self.sockthread:
296 # this thread does all reading of requests or responses
Chui Tey5d2af632002-05-26 13:36:41 +0000297 while 1:
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000298 response = self.pollresponse(myseq, wait)
Chui Tey5d2af632002-05-26 13:36:41 +0000299 if response is not None:
300 return response
301 else:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000302 # wait for notification from socket handling thread
303 cvar = self.cvars[myseq]
304 cvar.acquire()
Chui Tey5d2af632002-05-26 13:36:41 +0000305 while not self.responses.has_key(myseq):
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000306 cvar.wait()
Chui Tey5d2af632002-05-26 13:36:41 +0000307 response = self.responses[myseq]
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000308 self.debug("_getresponse:%s: thread woke up: response: %s" %
309 (myseq, response))
Chui Tey5d2af632002-05-26 13:36:41 +0000310 del self.responses[myseq]
311 del self.cvars[myseq]
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000312 cvar.release()
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000313 return response
Chui Tey5d2af632002-05-26 13:36:41 +0000314
315 def newseq(self):
316 self.nextseq = seq = self.nextseq + 2
317 return seq
318
319 def putmessage(self, message):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000320 self.debug("putmessage:%d:" % message[0])
Chui Tey5d2af632002-05-26 13:36:41 +0000321 try:
322 s = pickle.dumps(message)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000323 except pickle.UnpicklingError:
Chui Tey5d2af632002-05-26 13:36:41 +0000324 print >>sys.__stderr__, "Cannot pickle:", `message`
325 raise
326 s = struct.pack("<i", len(s)) + s
327 while len(s) > 0:
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000328 try:
329 n = self.sock.send(s)
330 except AttributeError:
331 # socket was closed
332 raise IOError
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000333 except socket.error:
334 self.debug("putmessage:socketerror:pid:%s" % os.getpid())
335 os._exit(0)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000336 else:
337 s = s[n:]
Chui Tey5d2af632002-05-26 13:36:41 +0000338
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000339 def ioready(self, wait):
Chui Tey5d2af632002-05-26 13:36:41 +0000340 r, w, x = select.select([self.sock.fileno()], [], [], wait)
341 return len(r)
342
343 buffer = ""
344 bufneed = 4
345 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
346
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000347 def pollpacket(self, wait):
Chui Tey5d2af632002-05-26 13:36:41 +0000348 self._stage0()
349 if len(self.buffer) < self.bufneed:
350 if not self.ioready(wait):
351 return None
352 try:
353 s = self.sock.recv(BUFSIZE)
354 except socket.error:
355 raise EOFError
356 if len(s) == 0:
357 raise EOFError
358 self.buffer += s
359 self._stage0()
360 return self._stage1()
361
362 def _stage0(self):
363 if self.bufstate == 0 and len(self.buffer) >= 4:
364 s = self.buffer[:4]
365 self.buffer = self.buffer[4:]
366 self.bufneed = struct.unpack("<i", s)[0]
367 self.bufstate = 1
368
369 def _stage1(self):
370 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
371 packet = self.buffer[:self.bufneed]
372 self.buffer = self.buffer[self.bufneed:]
373 self.bufneed = 4
374 self.bufstate = 0
375 return packet
376
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000377 def pollmessage(self, wait):
Chui Tey5d2af632002-05-26 13:36:41 +0000378 packet = self.pollpacket(wait)
379 if packet is None:
380 return None
381 try:
382 message = pickle.loads(packet)
383 except:
384 print >>sys.__stderr__, "-----------------------"
385 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
386 traceback.print_stack(file=sys.__stderr__)
387 print >>sys.__stderr__, "-----------------------"
388 raise
389 return message
390
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000391 def pollresponse(self, myseq, wait):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000392 """Handle messages received on the socket.
393
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000394 Some messages received may be asynchronous 'call' or 'queue' requests,
395 and some may be responses for other threads.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000396
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000397 'call' requests are passed to self.localcall() with the expectation of
398 immediate execution, during which time the socket is not serviced.
399
400 'queue' requests are used for tasks (which may block or hang) to be
401 processed in a different thread. These requests are fed into
402 request_queue by self.localcall(). Responses to queued requests are
403 taken from response_queue and sent across the link with the associated
404 sequence numbers. Messages in the queues are (sequence_number,
405 request/response) tuples and code using this module removing messages
406 from the request_queue is responsible for returning the correct
407 sequence number in the response_queue.
408
409 pollresponse() will loop until a response message with the myseq
410 sequence number is received, and will save other responses in
411 self.responses and notify the owning thread.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000412
413 """
Chui Tey5d2af632002-05-26 13:36:41 +0000414 while 1:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000415 # send queued response if there is one available
416 try:
417 qmsg = response_queue.get(0)
418 except Queue.Empty:
419 pass
420 else:
421 seq, response = qmsg
422 message = (seq, ('OK', response))
423 self.putmessage(message)
424 # poll for message on link
425 try:
426 message = self.pollmessage(wait)
427 if message is None: # socket not ready
428 return None
429 except EOFError:
430 self.handle_EOF()
Chui Tey5d2af632002-05-26 13:36:41 +0000431 return None
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000432 except AttributeError:
433 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000434 seq, resq = message
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000435 how = resq[0]
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000436 self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000437 # process or queue a request
438 if how in ("CALL", "QUEUE"):
Kurt B. Kaiserbc286132003-01-25 21:33:40 +0000439 self.debug("pollresponse:%d:localcall:call:" % seq)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000440 response = self.localcall(seq, resq)
Kurt B. Kaiserbc286132003-01-25 21:33:40 +0000441 self.debug("pollresponse:%d:localcall:response:%s"
442 % (seq, response))
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000443 if how == "CALL":
444 self.putmessage((seq, response))
445 elif how == "QUEUE":
446 # don't acknowledge the 'queue' request!
447 pass
Chui Tey5d2af632002-05-26 13:36:41 +0000448 continue
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000449 # return if completed message transaction
Chui Tey5d2af632002-05-26 13:36:41 +0000450 elif seq == myseq:
451 return resq
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000452 # must be a response for a different thread:
Chui Tey5d2af632002-05-26 13:36:41 +0000453 else:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000454 cv = self.cvars.get(seq, None)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000455 # response involving unknown sequence number is discarded,
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000456 # probably intended for prior incarnation of server
Chui Tey5d2af632002-05-26 13:36:41 +0000457 if cv is not None:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000458 cv.acquire()
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000459 self.responses[seq] = resq
Chui Tey5d2af632002-05-26 13:36:41 +0000460 cv.notify()
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000461 cv.release()
Chui Tey5d2af632002-05-26 13:36:41 +0000462 continue
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000463
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000464 def handle_EOF(self):
465 "action taken upon link being closed by peer"
466 self.EOFhook()
467 self.debug("handle_EOF")
468 for key in self.cvars:
469 cv = self.cvars[key]
470 cv.acquire()
471 self.responses[key] = ('EOF', None)
472 cv.notify()
473 cv.release()
474 interrupt.interrupt_main()
475 # call our (possibly overridden) exit function
476 self.exithook()
477
478 def EOFhook(self):
479 "Classes using rpc client/server can override to augment EOF action"
480 pass
481
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000482#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000483
484class RemoteObject:
485 # Token mix-in class
486 pass
487
488def remoteref(obj):
489 oid = id(obj)
490 objecttable[oid] = obj
491 return RemoteProxy(oid)
492
493class RemoteProxy:
494
495 def __init__(self, oid):
496 self.oid = oid
497
498class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
499
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000500 debugging = False
501 location = "#S" # Server
Chui Tey5d2af632002-05-26 13:36:41 +0000502
503 def __init__(self, sock, addr, svr):
504 svr.current_handler = self ## cgt xxx
505 SocketIO.__init__(self, sock)
506 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
507
Chui Tey5d2af632002-05-26 13:36:41 +0000508 def handle(self):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000509 "handle() method required by SocketServer"
Chui Tey5d2af632002-05-26 13:36:41 +0000510 self.mainloop()
511
512 def get_remote_proxy(self, oid):
513 return RPCProxy(self, oid)
514
515class RPCClient(SocketIO):
516
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000517 debugging = False
518 location = "#C" # Client
519
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000520 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000521
522 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000523 self.listening_sock = socket.socket(family, type)
524 self.listening_sock.setsockopt(socket.SOL_SOCKET,
525 socket.SO_REUSEADDR, 1)
526 self.listening_sock.bind(address)
527 self.listening_sock.listen(1)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000528
529 def accept(self):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000530 working_sock, address = self.listening_sock.accept()
Kurt B. Kaiser74d93c82002-12-23 22:51:03 +0000531 if self.debugging:
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000532 print>>sys.__stderr__, "****** Connection request from ", address
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000533 if address[0] == '127.0.0.1':
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000534 SocketIO.__init__(self, working_sock)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000535 else:
Kurt B. Kaiser74d93c82002-12-23 22:51:03 +0000536 print>>sys.__stderr__, "** Invalid host: ", address
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000537 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000538
539 def get_remote_proxy(self, oid):
540 return RPCProxy(self, oid)
541
542class RPCProxy:
543
544 __methods = None
545 __attributes = None
546
547 def __init__(self, sockio, oid):
548 self.sockio = sockio
549 self.oid = oid
550
551 def __getattr__(self, name):
552 if self.__methods is None:
553 self.__getmethods()
554 if self.__methods.get(name):
555 return MethodProxy(self.sockio, self.oid, name)
556 if self.__attributes is None:
557 self.__getattributes()
558 if not self.__attributes.has_key(name):
559 raise AttributeError, name
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000560
561 __getattr__.DebuggerStepThrough = 1
Chui Tey5d2af632002-05-26 13:36:41 +0000562
563 def __getattributes(self):
564 self.__attributes = self.sockio.remotecall(self.oid,
565 "__attributes__", (), {})
566
567 def __getmethods(self):
568 self.__methods = self.sockio.remotecall(self.oid,
569 "__methods__", (), {})
570
571def _getmethods(obj, methods):
572 # Helper to get a list of methods from an object
573 # Adds names to dictionary argument 'methods'
574 for name in dir(obj):
575 attr = getattr(obj, name)
576 if callable(attr):
577 methods[name] = 1
578 if type(obj) == types.InstanceType:
579 _getmethods(obj.__class__, methods)
580 if type(obj) == types.ClassType:
581 for super in obj.__bases__:
582 _getmethods(super, methods)
583
584def _getattributes(obj, attributes):
585 for name in dir(obj):
586 attr = getattr(obj, name)
587 if not callable(attr):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000588 attributes[name] = 1
Chui Tey5d2af632002-05-26 13:36:41 +0000589
590class MethodProxy:
591
592 def __init__(self, sockio, oid, name):
593 self.sockio = sockio
594 self.oid = oid
595 self.name = name
596
597 def __call__(self, *args, **kwargs):
598 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
599 return value
600
601#
602# Self Test
603#
604
605def testServer(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000606 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000607 class RemotePerson:
608 def __init__(self,name):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000609 self.name = name
Chui Tey5d2af632002-05-26 13:36:41 +0000610 def greet(self, name):
611 print "(someone called greet)"
612 print "Hello %s, I am %s." % (name, self.name)
613 print
614 def getName(self):
615 print "(someone called getName)"
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000616 print
Chui Tey5d2af632002-05-26 13:36:41 +0000617 return self.name
618 def greet_this_guy(self, name):
619 print "(someone called greet_this_guy)"
620 print "About to greet %s ..." % name
621 remote_guy = self.server.current_handler.get_remote_proxy(name)
622 remote_guy.greet("Thomas Edison")
623 print "Done."
624 print
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000625
Chui Tey5d2af632002-05-26 13:36:41 +0000626 person = RemotePerson("Thomas Edison")
627 svr = RPCServer(addr)
628 svr.register('thomas', person)
629 person.server = svr # only required if callbacks are used
630
631 # svr.serve_forever()
632 svr.handle_request() # process once only
633
634def testClient(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000635 "demonstrates RPC Client"
636 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000637 import time
638 clt=RPCClient(addr)
639 thomas = clt.get_remote_proxy("thomas")
640 print "The remote person's name is ..."
641 print thomas.getName()
642 # print clt.remotecall("thomas", "getName", (), {})
643 print
644 time.sleep(1)
645 print "Getting remote thomas to say hi..."
646 thomas.greet("Alexander Bell")
647 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
648 print "Done."
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000649 print
Chui Tey5d2af632002-05-26 13:36:41 +0000650 time.sleep(2)
Chui Tey5d2af632002-05-26 13:36:41 +0000651 # demonstrates remote server calling local instance
652 class LocalPerson:
653 def __init__(self,name):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000654 self.name = name
Chui Tey5d2af632002-05-26 13:36:41 +0000655 def greet(self, name):
656 print "You've greeted me!"
657 def getName(self):
658 return self.name
659 person = LocalPerson("Alexander Bell")
660 clt.register("alexander",person)
661 thomas.greet_this_guy("alexander")
662 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
663
664def test():
665 addr=("localhost",8833)
666 if len(sys.argv) == 2:
667 if sys.argv[1]=='-server':
668 testServer(addr)
669 return
670 testClient(addr)
671
672if __name__ == '__main__':
673 test()