blob: 658aaf37f5543720d2dd1d221a7042c769c8b0d2 [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Kurt B. Kaisera00050f2003-05-08 20:26:55 +000031import os
Chui Tey5d2af632002-05-26 13:36:41 +000032import socket
33import select
34import SocketServer
35import struct
36import cPickle as pickle
37import threading
Kurt B. Kaisera00050f2003-05-08 20:26:55 +000038import Queue
Chui Tey5d2af632002-05-26 13:36:41 +000039import traceback
40import copy_reg
41import types
42import marshal
43
Kurt B. Kaisera00050f2003-05-08 20:26:55 +000044
Chui Tey5d2af632002-05-26 13:36:41 +000045def unpickle_code(ms):
46 co = marshal.loads(ms)
47 assert isinstance(co, types.CodeType)
48 return co
49
50def pickle_code(co):
51 assert isinstance(co, types.CodeType)
52 ms = marshal.dumps(co)
53 return unpickle_code, (ms,)
54
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000055# XXX KBK 24Aug02 function pickling capability not used in Idle
56# def unpickle_function(ms):
57# return ms
Chui Tey5d2af632002-05-26 13:36:41 +000058
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000059# def pickle_function(fn):
60# assert isinstance(fn, type.FunctionType)
61# return `fn`
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000062
Chui Tey5d2af632002-05-26 13:36:41 +000063copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000064# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
Chui Tey5d2af632002-05-26 13:36:41 +000065
66BUFSIZE = 8*1024
67
68class RPCServer(SocketServer.TCPServer):
69
70 def __init__(self, addr, handlerclass=None):
71 if handlerclass is None:
72 handlerclass = RPCHandler
Chui Tey5d2af632002-05-26 13:36:41 +000073 SocketServer.TCPServer.__init__(self, addr, handlerclass)
74
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000075 def server_bind(self):
76 "Override TCPServer method, no bind() phase for connecting entity"
77 pass
Chui Tey5d2af632002-05-26 13:36:41 +000078
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000079 def server_activate(self):
80 """Override TCPServer method, connect() instead of listen()
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000081
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000082 Due to the reversed connection, self.server_address is actually the
83 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000084
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000085 """
86 self.socket.connect(self.server_address)
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000087
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000088 def get_request(self):
89 "Override TCPServer method, return already connected socket"
90 return self.socket, self.server_address
Chui Tey5d2af632002-05-26 13:36:41 +000091
Kurt B. Kaiser003091c2003-02-17 18:57:16 +000092 def handle_error(self, request, client_address):
Kurt B. Kaisere51529d2003-03-22 19:15:58 +000093 """Override TCPServer method
94
95 Error message goes to __stderr__. No error message if exiting
96 normally or socket raised EOF. Other exceptions not handled in
97 server code will cause os._exit.
98
99 """
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000100 try:
101 raise
102 except SystemExit:
103 raise
Kurt B. Kaisere51529d2003-03-22 19:15:58 +0000104 except:
Kurt B. Kaiser05293772003-03-22 20:11:14 +0000105 erf = sys.__stderr__
106 print>>erf, '\n' + '-'*40
Kurt B. Kaisere51529d2003-03-22 19:15:58 +0000107 print>>erf, 'Unhandled server exception!'
108 print>>erf, 'Thread: %s' % threading.currentThread().getName()
Kurt B. Kaiser05293772003-03-22 20:11:14 +0000109 print>>erf, 'Client Address: ', client_address
Kurt B. Kaisere51529d2003-03-22 19:15:58 +0000110 print>>erf, 'Request: ', repr(request)
111 traceback.print_exc(file=erf)
112 print>>erf, '\n*** Unrecoverable, server exiting!'
113 print>>erf, '-'*40
Kurt B. Kaiser05293772003-03-22 20:11:14 +0000114 os._exit(0)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000115
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000116#----------------- end class RPCServer --------------------
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000117
Chui Tey5d2af632002-05-26 13:36:41 +0000118objecttable = {}
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000119request_queue = Queue.Queue(0)
120response_queue = Queue.Queue(0)
121
Chui Tey5d2af632002-05-26 13:36:41 +0000122
123class SocketIO:
124
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000125 nextseq = 0
126
Chui Tey5d2af632002-05-26 13:36:41 +0000127 def __init__(self, sock, objtable=None, debugging=None):
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000128 self.sockthread = threading.currentThread()
Chui Tey5d2af632002-05-26 13:36:41 +0000129 if debugging is not None:
130 self.debugging = debugging
131 self.sock = sock
132 if objtable is None:
133 objtable = objecttable
134 self.objtable = objtable
Chui Tey5d2af632002-05-26 13:36:41 +0000135 self.responses = {}
136 self.cvars = {}
137
138 def close(self):
139 sock = self.sock
140 self.sock = None
141 if sock is not None:
142 sock.close()
143
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000144 def exithook(self):
145 "override for specific exit action"
146 os._exit()
147
Chui Tey5d2af632002-05-26 13:36:41 +0000148 def debug(self, *args):
149 if not self.debugging:
150 return
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000151 s = self.location + " " + str(threading.currentThread().getName())
Chui Tey5d2af632002-05-26 13:36:41 +0000152 for a in args:
153 s = s + " " + str(a)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000154 print>>sys.__stderr__, s
Chui Tey5d2af632002-05-26 13:36:41 +0000155
156 def register(self, oid, object):
157 self.objtable[oid] = object
158
159 def unregister(self, oid):
160 try:
161 del self.objtable[oid]
162 except KeyError:
163 pass
164
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000165 def localcall(self, seq, request):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000166 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000167 try:
168 how, (oid, methodname, args, kwargs) = request
169 except TypeError:
170 return ("ERROR", "Bad request format")
Chui Tey5d2af632002-05-26 13:36:41 +0000171 if not self.objtable.has_key(oid):
172 return ("ERROR", "Unknown object id: %s" % `oid`)
173 obj = self.objtable[oid]
174 if methodname == "__methods__":
175 methods = {}
176 _getmethods(obj, methods)
177 return ("OK", methods)
178 if methodname == "__attributes__":
179 attributes = {}
180 _getattributes(obj, attributes)
181 return ("OK", attributes)
182 if not hasattr(obj, methodname):
183 return ("ERROR", "Unsupported method name: %s" % `methodname`)
184 method = getattr(obj, methodname)
185 try:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000186 if how == 'CALL':
187 ret = method(*args, **kwargs)
188 if isinstance(ret, RemoteObject):
189 ret = remoteref(ret)
190 return ("OK", ret)
191 elif how == 'QUEUE':
192 request_queue.put((seq, (method, args, kwargs)))
193 return("QUEUED", None)
194 else:
195 return ("ERROR", "Unsupported message type: %s" % how)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000196 except SystemExit:
197 raise
Kurt B. Kaiser9ac783d2003-03-10 20:42:24 +0000198 except socket.error:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000199 raise
Chui Tey5d2af632002-05-26 13:36:41 +0000200 except:
Kurt B. Kaiser8cd0def2003-01-31 05:06:43 +0000201 self.debug("localcall:EXCEPTION")
Kurt B. Kaiser86bc4642003-02-27 23:04:17 +0000202 traceback.print_exc(file=sys.__stderr__)
Kurt B. Kaiser8cd0def2003-01-31 05:06:43 +0000203 return ("EXCEPTION", None)
204
Chui Tey5d2af632002-05-26 13:36:41 +0000205 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000206 self.debug("remotecall:asynccall: ", oid, methodname)
Chui Tey5d2af632002-05-26 13:36:41 +0000207 seq = self.asynccall(oid, methodname, args, kwargs)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000208 return self.asyncreturn(seq)
Chui Tey5d2af632002-05-26 13:36:41 +0000209
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000210 def remotequeue(self, oid, methodname, args, kwargs):
211 self.debug("remotequeue:asyncqueue: ", oid, methodname)
212 seq = self.asyncqueue(oid, methodname, args, kwargs)
213 return self.asyncreturn(seq)
214
Chui Tey5d2af632002-05-26 13:36:41 +0000215 def asynccall(self, oid, methodname, args, kwargs):
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000216 request = ("CALL", (oid, methodname, args, kwargs))
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000217 seq = self.newseq()
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000218 if threading.currentThread() != self.sockthread:
219 cvar = threading.Condition()
220 self.cvars[seq] = cvar
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000221 self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
222 self.putmessage((seq, request))
Chui Tey5d2af632002-05-26 13:36:41 +0000223 return seq
224
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000225 def asyncqueue(self, oid, methodname, args, kwargs):
226 request = ("QUEUE", (oid, methodname, args, kwargs))
227 seq = self.newseq()
228 if threading.currentThread() != self.sockthread:
229 cvar = threading.Condition()
230 self.cvars[seq] = cvar
231 self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
232 self.putmessage((seq, request))
233 return seq
234
Chui Tey5d2af632002-05-26 13:36:41 +0000235 def asyncreturn(self, seq):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000236 self.debug("asyncreturn:%d:call getresponse(): " % seq)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000237 response = self.getresponse(seq, wait=0.05)
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000238 self.debug(("asyncreturn:%d:response: " % seq), response)
Chui Tey5d2af632002-05-26 13:36:41 +0000239 return self.decoderesponse(response)
240
241 def decoderesponse(self, response):
242 how, what = response
243 if how == "OK":
244 return what
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000245 if how == "QUEUED":
246 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000247 if how == "EXCEPTION":
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000248 self.debug("decoderesponse: EXCEPTION")
249 return None
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000250 if how == "EOF":
251 self.debug("decoderesponse: EOF")
252 self.decode_interrupthook()
253 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000254 if how == "ERROR":
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000255 self.debug("decoderesponse: Internal ERROR:", what)
Chui Tey5d2af632002-05-26 13:36:41 +0000256 raise RuntimeError, what
257 raise SystemError, (how, what)
258
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000259 def decode_interrupthook(self):
260 ""
261 raise EOFError
262
Chui Tey5d2af632002-05-26 13:36:41 +0000263 def mainloop(self):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000264 """Listen on socket until I/O not ready or EOF
265
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000266 pollresponse() will loop looking for seq number None, which
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000267 never comes, and exit on EOFError.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000268
269 """
Chui Tey5d2af632002-05-26 13:36:41 +0000270 try:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000271 self.getresponse(myseq=None, wait=0.05)
Chui Tey5d2af632002-05-26 13:36:41 +0000272 except EOFError:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000273 self.debug("mainloop:return")
274 return
Chui Tey5d2af632002-05-26 13:36:41 +0000275
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000276 def getresponse(self, myseq, wait):
277 response = self._getresponse(myseq, wait)
Chui Tey5d2af632002-05-26 13:36:41 +0000278 if response is not None:
279 how, what = response
280 if how == "OK":
281 response = how, self._proxify(what)
282 return response
283
284 def _proxify(self, obj):
285 if isinstance(obj, RemoteProxy):
286 return RPCProxy(self, obj.oid)
287 if isinstance(obj, types.ListType):
288 return map(self._proxify, obj)
289 # XXX Check for other types -- not currently needed
290 return obj
291
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000292 def _getresponse(self, myseq, wait):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000293 self.debug("_getresponse:myseq:", myseq)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000294 if threading.currentThread() is self.sockthread:
295 # this thread does all reading of requests or responses
Chui Tey5d2af632002-05-26 13:36:41 +0000296 while 1:
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000297 response = self.pollresponse(myseq, wait)
Chui Tey5d2af632002-05-26 13:36:41 +0000298 if response is not None:
299 return response
300 else:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000301 # wait for notification from socket handling thread
302 cvar = self.cvars[myseq]
303 cvar.acquire()
Chui Tey5d2af632002-05-26 13:36:41 +0000304 while not self.responses.has_key(myseq):
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000305 cvar.wait()
Chui Tey5d2af632002-05-26 13:36:41 +0000306 response = self.responses[myseq]
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000307 self.debug("_getresponse:%s: thread woke up: response: %s" %
308 (myseq, response))
Chui Tey5d2af632002-05-26 13:36:41 +0000309 del self.responses[myseq]
310 del self.cvars[myseq]
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000311 cvar.release()
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000312 return response
Chui Tey5d2af632002-05-26 13:36:41 +0000313
314 def newseq(self):
315 self.nextseq = seq = self.nextseq + 2
316 return seq
317
318 def putmessage(self, message):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000319 self.debug("putmessage:%d:" % message[0])
Chui Tey5d2af632002-05-26 13:36:41 +0000320 try:
321 s = pickle.dumps(message)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000322 except pickle.UnpicklingError:
Chui Tey5d2af632002-05-26 13:36:41 +0000323 print >>sys.__stderr__, "Cannot pickle:", `message`
324 raise
325 s = struct.pack("<i", len(s)) + s
326 while len(s) > 0:
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000327 try:
328 n = self.sock.send(s)
Kurt B. Kaiser67fd0ea2003-05-24 20:59:15 +0000329 except (AttributeError, socket.error):
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000330 # socket was closed
331 raise IOError
332 else:
333 s = s[n:]
Chui Tey5d2af632002-05-26 13:36:41 +0000334
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000335 def ioready(self, wait):
Chui Tey5d2af632002-05-26 13:36:41 +0000336 r, w, x = select.select([self.sock.fileno()], [], [], wait)
337 return len(r)
338
339 buffer = ""
340 bufneed = 4
341 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
342
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000343 def pollpacket(self, wait):
Chui Tey5d2af632002-05-26 13:36:41 +0000344 self._stage0()
345 if len(self.buffer) < self.bufneed:
346 if not self.ioready(wait):
347 return None
348 try:
349 s = self.sock.recv(BUFSIZE)
350 except socket.error:
351 raise EOFError
352 if len(s) == 0:
353 raise EOFError
354 self.buffer += s
355 self._stage0()
356 return self._stage1()
357
358 def _stage0(self):
359 if self.bufstate == 0 and len(self.buffer) >= 4:
360 s = self.buffer[:4]
361 self.buffer = self.buffer[4:]
362 self.bufneed = struct.unpack("<i", s)[0]
363 self.bufstate = 1
364
365 def _stage1(self):
366 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
367 packet = self.buffer[:self.bufneed]
368 self.buffer = self.buffer[self.bufneed:]
369 self.bufneed = 4
370 self.bufstate = 0
371 return packet
372
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000373 def pollmessage(self, wait):
Chui Tey5d2af632002-05-26 13:36:41 +0000374 packet = self.pollpacket(wait)
375 if packet is None:
376 return None
377 try:
378 message = pickle.loads(packet)
379 except:
380 print >>sys.__stderr__, "-----------------------"
381 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
382 traceback.print_stack(file=sys.__stderr__)
383 print >>sys.__stderr__, "-----------------------"
384 raise
385 return message
386
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000387 def pollresponse(self, myseq, wait):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000388 """Handle messages received on the socket.
389
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000390 Some messages received may be asynchronous 'call' or 'queue' requests,
391 and some may be responses for other threads.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000392
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000393 'call' requests are passed to self.localcall() with the expectation of
394 immediate execution, during which time the socket is not serviced.
395
396 'queue' requests are used for tasks (which may block or hang) to be
397 processed in a different thread. These requests are fed into
398 request_queue by self.localcall(). Responses to queued requests are
399 taken from response_queue and sent across the link with the associated
400 sequence numbers. Messages in the queues are (sequence_number,
401 request/response) tuples and code using this module removing messages
402 from the request_queue is responsible for returning the correct
403 sequence number in the response_queue.
404
405 pollresponse() will loop until a response message with the myseq
406 sequence number is received, and will save other responses in
407 self.responses and notify the owning thread.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000408
409 """
Chui Tey5d2af632002-05-26 13:36:41 +0000410 while 1:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000411 # send queued response if there is one available
412 try:
413 qmsg = response_queue.get(0)
414 except Queue.Empty:
415 pass
416 else:
417 seq, response = qmsg
418 message = (seq, ('OK', response))
419 self.putmessage(message)
420 # poll for message on link
421 try:
422 message = self.pollmessage(wait)
423 if message is None: # socket not ready
424 return None
425 except EOFError:
426 self.handle_EOF()
Chui Tey5d2af632002-05-26 13:36:41 +0000427 return None
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000428 except AttributeError:
429 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000430 seq, resq = message
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000431 how = resq[0]
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000432 self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000433 # process or queue a request
434 if how in ("CALL", "QUEUE"):
Kurt B. Kaiserbc286132003-01-25 21:33:40 +0000435 self.debug("pollresponse:%d:localcall:call:" % seq)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000436 response = self.localcall(seq, resq)
Kurt B. Kaiserbc286132003-01-25 21:33:40 +0000437 self.debug("pollresponse:%d:localcall:response:%s"
438 % (seq, response))
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000439 if how == "CALL":
440 self.putmessage((seq, response))
441 elif how == "QUEUE":
442 # don't acknowledge the 'queue' request!
443 pass
Chui Tey5d2af632002-05-26 13:36:41 +0000444 continue
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000445 # return if completed message transaction
Chui Tey5d2af632002-05-26 13:36:41 +0000446 elif seq == myseq:
447 return resq
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000448 # must be a response for a different thread:
Chui Tey5d2af632002-05-26 13:36:41 +0000449 else:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000450 cv = self.cvars.get(seq, None)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000451 # response involving unknown sequence number is discarded,
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000452 # probably intended for prior incarnation of server
Chui Tey5d2af632002-05-26 13:36:41 +0000453 if cv is not None:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000454 cv.acquire()
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000455 self.responses[seq] = resq
Chui Tey5d2af632002-05-26 13:36:41 +0000456 cv.notify()
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000457 cv.release()
Chui Tey5d2af632002-05-26 13:36:41 +0000458 continue
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000459
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000460 def handle_EOF(self):
461 "action taken upon link being closed by peer"
462 self.EOFhook()
463 self.debug("handle_EOF")
464 for key in self.cvars:
465 cv = self.cvars[key]
466 cv.acquire()
467 self.responses[key] = ('EOF', None)
468 cv.notify()
469 cv.release()
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000470 # call our (possibly overridden) exit function
471 self.exithook()
472
473 def EOFhook(self):
474 "Classes using rpc client/server can override to augment EOF action"
475 pass
476
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000477#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000478
479class RemoteObject:
480 # Token mix-in class
481 pass
482
483def remoteref(obj):
484 oid = id(obj)
485 objecttable[oid] = obj
486 return RemoteProxy(oid)
487
488class RemoteProxy:
489
490 def __init__(self, oid):
491 self.oid = oid
492
493class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
494
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000495 debugging = False
496 location = "#S" # Server
Chui Tey5d2af632002-05-26 13:36:41 +0000497
498 def __init__(self, sock, addr, svr):
499 svr.current_handler = self ## cgt xxx
500 SocketIO.__init__(self, sock)
501 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
502
Chui Tey5d2af632002-05-26 13:36:41 +0000503 def handle(self):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000504 "handle() method required by SocketServer"
Chui Tey5d2af632002-05-26 13:36:41 +0000505 self.mainloop()
506
507 def get_remote_proxy(self, oid):
508 return RPCProxy(self, oid)
509
510class RPCClient(SocketIO):
511
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000512 debugging = False
513 location = "#C" # Client
514
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000515 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000516
517 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000518 self.listening_sock = socket.socket(family, type)
519 self.listening_sock.setsockopt(socket.SOL_SOCKET,
520 socket.SO_REUSEADDR, 1)
521 self.listening_sock.bind(address)
522 self.listening_sock.listen(1)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000523
524 def accept(self):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000525 working_sock, address = self.listening_sock.accept()
Kurt B. Kaiser74d93c82002-12-23 22:51:03 +0000526 if self.debugging:
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000527 print>>sys.__stderr__, "****** Connection request from ", address
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000528 if address[0] == '127.0.0.1':
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000529 SocketIO.__init__(self, working_sock)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000530 else:
Kurt B. Kaiser74d93c82002-12-23 22:51:03 +0000531 print>>sys.__stderr__, "** Invalid host: ", address
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000532 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000533
534 def get_remote_proxy(self, oid):
535 return RPCProxy(self, oid)
536
537class RPCProxy:
538
539 __methods = None
540 __attributes = None
541
542 def __init__(self, sockio, oid):
543 self.sockio = sockio
544 self.oid = oid
545
546 def __getattr__(self, name):
547 if self.__methods is None:
548 self.__getmethods()
549 if self.__methods.get(name):
550 return MethodProxy(self.sockio, self.oid, name)
551 if self.__attributes is None:
552 self.__getattributes()
553 if not self.__attributes.has_key(name):
554 raise AttributeError, name
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000555
Chui Tey5d2af632002-05-26 13:36:41 +0000556 def __getattributes(self):
557 self.__attributes = self.sockio.remotecall(self.oid,
558 "__attributes__", (), {})
559
560 def __getmethods(self):
561 self.__methods = self.sockio.remotecall(self.oid,
562 "__methods__", (), {})
563
564def _getmethods(obj, methods):
565 # Helper to get a list of methods from an object
566 # Adds names to dictionary argument 'methods'
567 for name in dir(obj):
568 attr = getattr(obj, name)
569 if callable(attr):
570 methods[name] = 1
571 if type(obj) == types.InstanceType:
572 _getmethods(obj.__class__, methods)
573 if type(obj) == types.ClassType:
574 for super in obj.__bases__:
575 _getmethods(super, methods)
576
577def _getattributes(obj, attributes):
578 for name in dir(obj):
579 attr = getattr(obj, name)
580 if not callable(attr):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000581 attributes[name] = 1
Chui Tey5d2af632002-05-26 13:36:41 +0000582
583class MethodProxy:
584
585 def __init__(self, sockio, oid, name):
586 self.sockio = sockio
587 self.oid = oid
588 self.name = name
589
590 def __call__(self, *args, **kwargs):
591 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
592 return value
593
594#
595# Self Test
596#
597
598def testServer(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000599 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000600 class RemotePerson:
601 def __init__(self,name):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000602 self.name = name
Chui Tey5d2af632002-05-26 13:36:41 +0000603 def greet(self, name):
604 print "(someone called greet)"
605 print "Hello %s, I am %s." % (name, self.name)
606 print
607 def getName(self):
608 print "(someone called getName)"
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000609 print
Chui Tey5d2af632002-05-26 13:36:41 +0000610 return self.name
611 def greet_this_guy(self, name):
612 print "(someone called greet_this_guy)"
613 print "About to greet %s ..." % name
614 remote_guy = self.server.current_handler.get_remote_proxy(name)
615 remote_guy.greet("Thomas Edison")
616 print "Done."
617 print
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000618
Chui Tey5d2af632002-05-26 13:36:41 +0000619 person = RemotePerson("Thomas Edison")
620 svr = RPCServer(addr)
621 svr.register('thomas', person)
622 person.server = svr # only required if callbacks are used
623
624 # svr.serve_forever()
625 svr.handle_request() # process once only
626
627def testClient(addr):
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000628 "demonstrates RPC Client"
629 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
Chui Tey5d2af632002-05-26 13:36:41 +0000630 import time
631 clt=RPCClient(addr)
632 thomas = clt.get_remote_proxy("thomas")
633 print "The remote person's name is ..."
634 print thomas.getName()
635 # print clt.remotecall("thomas", "getName", (), {})
636 print
637 time.sleep(1)
638 print "Getting remote thomas to say hi..."
639 thomas.greet("Alexander Bell")
640 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
641 print "Done."
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000642 print
Chui Tey5d2af632002-05-26 13:36:41 +0000643 time.sleep(2)
Chui Tey5d2af632002-05-26 13:36:41 +0000644 # demonstrates remote server calling local instance
645 class LocalPerson:
646 def __init__(self,name):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000647 self.name = name
Chui Tey5d2af632002-05-26 13:36:41 +0000648 def greet(self, name):
649 print "You've greeted me!"
650 def getName(self):
651 return self.name
652 person = LocalPerson("Alexander Bell")
653 clt.register("alexander",person)
654 thomas.greet_this_guy("alexander")
655 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
656
657def test():
658 addr=("localhost",8833)
659 if len(sys.argv) == 2:
660 if sys.argv[1]=='-server':
661 testServer(addr)
662 return
663 testClient(addr)
664
665if __name__ == '__main__':
666 test()