blob: 0c56ccdcfe50532a7f6ac3ae7399acab99691132 [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
Alexandre Vassalottice261952008-05-12 02:31:37 +00008 | socketserver.BaseRequestHandler | | SocketIO |
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00009 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Kurt B. Kaisera00050f2003-05-08 20:26:55 +000031import os
Chui Tey5d2af632002-05-26 13:36:41 +000032import socket
33import select
Alexandre Vassalottice261952008-05-12 02:31:37 +000034import socketserver
Chui Tey5d2af632002-05-26 13:36:41 +000035import struct
Guido van Rossum99603b02007-07-20 00:22:32 +000036import pickle
Chui Tey5d2af632002-05-26 13:36:41 +000037import threading
Alexandre Vassalottif260e442008-05-11 19:59:59 +000038import queue
Chui Tey5d2af632002-05-26 13:36:41 +000039import traceback
Alexandre Vassalottif7fa63d2008-05-11 08:55:36 +000040import copyreg
Chui Tey5d2af632002-05-26 13:36:41 +000041import types
42import marshal
43
Kurt B. Kaisera00050f2003-05-08 20:26:55 +000044
Chui Tey5d2af632002-05-26 13:36:41 +000045def unpickle_code(ms):
46 co = marshal.loads(ms)
47 assert isinstance(co, types.CodeType)
48 return co
49
50def pickle_code(co):
51 assert isinstance(co, types.CodeType)
52 ms = marshal.dumps(co)
53 return unpickle_code, (ms,)
54
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000055# XXX KBK 24Aug02 function pickling capability not used in Idle
56# def unpickle_function(ms):
57# return ms
Chui Tey5d2af632002-05-26 13:36:41 +000058
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000059# def pickle_function(fn):
60# assert isinstance(fn, type.FunctionType)
Walter Dörwald70a6b492004-02-12 17:35:32 +000061# return repr(fn)
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000062
Alexandre Vassalottif7fa63d2008-05-11 08:55:36 +000063copyreg.pickle(types.CodeType, pickle_code, unpickle_code)
64# copyreg.pickle(types.FunctionType, pickle_function, unpickle_function)
Chui Tey5d2af632002-05-26 13:36:41 +000065
66BUFSIZE = 8*1024
Kurt B. Kaiser24d7e0c2003-06-05 23:51:29 +000067LOCALHOST = '127.0.0.1'
Chui Tey5d2af632002-05-26 13:36:41 +000068
Alexandre Vassalottice261952008-05-12 02:31:37 +000069class RPCServer(socketserver.TCPServer):
Chui Tey5d2af632002-05-26 13:36:41 +000070
71 def __init__(self, addr, handlerclass=None):
72 if handlerclass is None:
73 handlerclass = RPCHandler
Alexandre Vassalottice261952008-05-12 02:31:37 +000074 socketserver.TCPServer.__init__(self, addr, handlerclass)
Chui Tey5d2af632002-05-26 13:36:41 +000075
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000076 def server_bind(self):
77 "Override TCPServer method, no bind() phase for connecting entity"
78 pass
Chui Tey5d2af632002-05-26 13:36:41 +000079
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000080 def server_activate(self):
81 """Override TCPServer method, connect() instead of listen()
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000082
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000083 Due to the reversed connection, self.server_address is actually the
84 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000085
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000086 """
87 self.socket.connect(self.server_address)
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000088
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000089 def get_request(self):
90 "Override TCPServer method, return already connected socket"
91 return self.socket, self.server_address
Chui Tey5d2af632002-05-26 13:36:41 +000092
Kurt B. Kaiser003091c2003-02-17 18:57:16 +000093 def handle_error(self, request, client_address):
Kurt B. Kaisere51529d2003-03-22 19:15:58 +000094 """Override TCPServer method
95
96 Error message goes to __stderr__. No error message if exiting
97 normally or socket raised EOF. Other exceptions not handled in
98 server code will cause os._exit.
99
100 """
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000101 try:
102 raise
103 except SystemExit:
104 raise
Kurt B. Kaisere51529d2003-03-22 19:15:58 +0000105 except:
Kurt B. Kaiser05293772003-03-22 20:11:14 +0000106 erf = sys.__stderr__
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000107 print('\n' + '-'*40, file=erf)
108 print('Unhandled server exception!', file=erf)
Amaury Forgeot d'Arcbed17102008-11-29 01:48:47 +0000109 print('Thread: %s' % threading.current_thread().name, file=erf)
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000110 print('Client Address: ', client_address, file=erf)
111 print('Request: ', repr(request), file=erf)
Kurt B. Kaisere51529d2003-03-22 19:15:58 +0000112 traceback.print_exc(file=erf)
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000113 print('\n*** Unrecoverable, server exiting!', file=erf)
114 print('-'*40, file=erf)
Kurt B. Kaiser05293772003-03-22 20:11:14 +0000115 os._exit(0)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000116
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000117#----------------- end class RPCServer --------------------
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000118
Chui Tey5d2af632002-05-26 13:36:41 +0000119objecttable = {}
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000120request_queue = queue.Queue(0)
121response_queue = queue.Queue(0)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000122
Chui Tey5d2af632002-05-26 13:36:41 +0000123
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000124class SocketIO(object):
Chui Tey5d2af632002-05-26 13:36:41 +0000125
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000126 nextseq = 0
127
Chui Tey5d2af632002-05-26 13:36:41 +0000128 def __init__(self, sock, objtable=None, debugging=None):
Benjamin Petersonb03ca4b2008-06-13 02:00:47 +0000129 self.sockthread = threading.current_thread()
Chui Tey5d2af632002-05-26 13:36:41 +0000130 if debugging is not None:
131 self.debugging = debugging
132 self.sock = sock
133 if objtable is None:
134 objtable = objecttable
135 self.objtable = objtable
Chui Tey5d2af632002-05-26 13:36:41 +0000136 self.responses = {}
137 self.cvars = {}
138
139 def close(self):
140 sock = self.sock
141 self.sock = None
142 if sock is not None:
143 sock.close()
144
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000145 def exithook(self):
146 "override for specific exit action"
147 os._exit()
148
Chui Tey5d2af632002-05-26 13:36:41 +0000149 def debug(self, *args):
150 if not self.debugging:
151 return
Amaury Forgeot d'Arcbed17102008-11-29 01:48:47 +0000152 s = self.location + " " + str(threading.current_thread().name)
Chui Tey5d2af632002-05-26 13:36:41 +0000153 for a in args:
154 s = s + " " + str(a)
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000155 print(s, file=sys.__stderr__)
Chui Tey5d2af632002-05-26 13:36:41 +0000156
157 def register(self, oid, object):
158 self.objtable[oid] = object
159
160 def unregister(self, oid):
161 try:
162 del self.objtable[oid]
163 except KeyError:
164 pass
165
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000166 def localcall(self, seq, request):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000167 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000168 try:
169 how, (oid, methodname, args, kwargs) = request
170 except TypeError:
171 return ("ERROR", "Bad request format")
Guido van Rossum811c4e02006-08-22 15:45:46 +0000172 if oid not in self.objtable:
Walter Dörwald70a6b492004-02-12 17:35:32 +0000173 return ("ERROR", "Unknown object id: %r" % (oid,))
Chui Tey5d2af632002-05-26 13:36:41 +0000174 obj = self.objtable[oid]
175 if methodname == "__methods__":
176 methods = {}
177 _getmethods(obj, methods)
178 return ("OK", methods)
179 if methodname == "__attributes__":
180 attributes = {}
181 _getattributes(obj, attributes)
182 return ("OK", attributes)
183 if not hasattr(obj, methodname):
Walter Dörwald70a6b492004-02-12 17:35:32 +0000184 return ("ERROR", "Unsupported method name: %r" % (methodname,))
Chui Tey5d2af632002-05-26 13:36:41 +0000185 method = getattr(obj, methodname)
186 try:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000187 if how == 'CALL':
188 ret = method(*args, **kwargs)
189 if isinstance(ret, RemoteObject):
190 ret = remoteref(ret)
191 return ("OK", ret)
192 elif how == 'QUEUE':
193 request_queue.put((seq, (method, args, kwargs)))
194 return("QUEUED", None)
195 else:
196 return ("ERROR", "Unsupported message type: %s" % how)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000197 except SystemExit:
198 raise
Kurt B. Kaiser9ac783d2003-03-10 20:42:24 +0000199 except socket.error:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000200 raise
Chui Tey5d2af632002-05-26 13:36:41 +0000201 except:
Kurt B. Kaisere852c192004-12-23 04:39:55 +0000202 msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\
203 " Object: %s \n Method: %s \n Args: %s\n"
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000204 print(msg % (oid, method, args), file=sys.__stderr__)
Kurt B. Kaiser86bc4642003-02-27 23:04:17 +0000205 traceback.print_exc(file=sys.__stderr__)
Kurt B. Kaiser8cd0def2003-01-31 05:06:43 +0000206 return ("EXCEPTION", None)
207
Chui Tey5d2af632002-05-26 13:36:41 +0000208 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000209 self.debug("remotecall:asynccall: ", oid, methodname)
Chui Tey5d2af632002-05-26 13:36:41 +0000210 seq = self.asynccall(oid, methodname, args, kwargs)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000211 return self.asyncreturn(seq)
Chui Tey5d2af632002-05-26 13:36:41 +0000212
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000213 def remotequeue(self, oid, methodname, args, kwargs):
214 self.debug("remotequeue:asyncqueue: ", oid, methodname)
215 seq = self.asyncqueue(oid, methodname, args, kwargs)
216 return self.asyncreturn(seq)
217
Chui Tey5d2af632002-05-26 13:36:41 +0000218 def asynccall(self, oid, methodname, args, kwargs):
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000219 request = ("CALL", (oid, methodname, args, kwargs))
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000220 seq = self.newseq()
Benjamin Petersonb03ca4b2008-06-13 02:00:47 +0000221 if threading.current_thread() != self.sockthread:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000222 cvar = threading.Condition()
223 self.cvars[seq] = cvar
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000224 self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
225 self.putmessage((seq, request))
Chui Tey5d2af632002-05-26 13:36:41 +0000226 return seq
227
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000228 def asyncqueue(self, oid, methodname, args, kwargs):
229 request = ("QUEUE", (oid, methodname, args, kwargs))
230 seq = self.newseq()
Benjamin Petersonb03ca4b2008-06-13 02:00:47 +0000231 if threading.current_thread() != self.sockthread:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000232 cvar = threading.Condition()
233 self.cvars[seq] = cvar
234 self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
235 self.putmessage((seq, request))
236 return seq
237
Chui Tey5d2af632002-05-26 13:36:41 +0000238 def asyncreturn(self, seq):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000239 self.debug("asyncreturn:%d:call getresponse(): " % seq)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000240 response = self.getresponse(seq, wait=0.05)
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000241 self.debug(("asyncreturn:%d:response: " % seq), response)
Chui Tey5d2af632002-05-26 13:36:41 +0000242 return self.decoderesponse(response)
243
244 def decoderesponse(self, response):
245 how, what = response
246 if how == "OK":
247 return what
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000248 if how == "QUEUED":
249 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000250 if how == "EXCEPTION":
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000251 self.debug("decoderesponse: EXCEPTION")
252 return None
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000253 if how == "EOF":
254 self.debug("decoderesponse: EOF")
255 self.decode_interrupthook()
256 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000257 if how == "ERROR":
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000258 self.debug("decoderesponse: Internal ERROR:", what)
Kurt B. Kaiserad667422007-08-23 01:06:15 +0000259 raise RuntimeError(what)
260 raise SystemError(how, what)
Chui Tey5d2af632002-05-26 13:36:41 +0000261
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000262 def decode_interrupthook(self):
263 ""
264 raise EOFError
265
Chui Tey5d2af632002-05-26 13:36:41 +0000266 def mainloop(self):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000267 """Listen on socket until I/O not ready or EOF
268
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000269 pollresponse() will loop looking for seq number None, which
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000270 never comes, and exit on EOFError.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000271
272 """
Chui Tey5d2af632002-05-26 13:36:41 +0000273 try:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000274 self.getresponse(myseq=None, wait=0.05)
Chui Tey5d2af632002-05-26 13:36:41 +0000275 except EOFError:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000276 self.debug("mainloop:return")
277 return
Chui Tey5d2af632002-05-26 13:36:41 +0000278
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000279 def getresponse(self, myseq, wait):
280 response = self._getresponse(myseq, wait)
Chui Tey5d2af632002-05-26 13:36:41 +0000281 if response is not None:
282 how, what = response
283 if how == "OK":
284 response = how, self._proxify(what)
285 return response
286
287 def _proxify(self, obj):
288 if isinstance(obj, RemoteProxy):
289 return RPCProxy(self, obj.oid)
Guido van Rossum13257902007-06-07 23:15:56 +0000290 if isinstance(obj, list):
Kurt B. Kaiser66aaf742007-08-09 18:00:23 +0000291 return list(map(self._proxify, obj))
Chui Tey5d2af632002-05-26 13:36:41 +0000292 # XXX Check for other types -- not currently needed
293 return obj
294
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000295 def _getresponse(self, myseq, wait):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000296 self.debug("_getresponse:myseq:", myseq)
Benjamin Petersonb03ca4b2008-06-13 02:00:47 +0000297 if threading.current_thread() is self.sockthread:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000298 # this thread does all reading of requests or responses
Chui Tey5d2af632002-05-26 13:36:41 +0000299 while 1:
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000300 response = self.pollresponse(myseq, wait)
Chui Tey5d2af632002-05-26 13:36:41 +0000301 if response is not None:
302 return response
303 else:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000304 # wait for notification from socket handling thread
305 cvar = self.cvars[myseq]
306 cvar.acquire()
Guido van Rossum811c4e02006-08-22 15:45:46 +0000307 while myseq not in self.responses:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000308 cvar.wait()
Chui Tey5d2af632002-05-26 13:36:41 +0000309 response = self.responses[myseq]
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000310 self.debug("_getresponse:%s: thread woke up: response: %s" %
311 (myseq, response))
Chui Tey5d2af632002-05-26 13:36:41 +0000312 del self.responses[myseq]
313 del self.cvars[myseq]
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000314 cvar.release()
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000315 return response
Chui Tey5d2af632002-05-26 13:36:41 +0000316
317 def newseq(self):
318 self.nextseq = seq = self.nextseq + 2
319 return seq
320
321 def putmessage(self, message):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000322 self.debug("putmessage:%d:" % message[0])
Chui Tey5d2af632002-05-26 13:36:41 +0000323 try:
324 s = pickle.dumps(message)
Kurt B. Kaiserd6ab77d2004-01-21 19:21:11 +0000325 except pickle.PicklingError:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000326 print("Cannot pickle:", repr(message), file=sys.__stderr__)
Chui Tey5d2af632002-05-26 13:36:41 +0000327 raise
328 s = struct.pack("<i", len(s)) + s
329 while len(s) > 0:
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000330 try:
Kurt B. Kaiserd6ab77d2004-01-21 19:21:11 +0000331 r, w, x = select.select([], [self.sock], [])
332 n = self.sock.send(s[:BUFSIZE])
Kurt B. Kaiser935ea9a2005-05-10 03:44:24 +0000333 except (AttributeError, TypeError):
Kurt B. Kaiserad667422007-08-23 01:06:15 +0000334 raise IOError("socket no longer exists")
Kurt B. Kaiser935ea9a2005-05-10 03:44:24 +0000335 except socket.error:
336 raise
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000337 else:
338 s = s[n:]
Chui Tey5d2af632002-05-26 13:36:41 +0000339
Kurt B. Kaiser2d726df2007-08-22 21:33:27 +0000340 buff = b''
Chui Tey5d2af632002-05-26 13:36:41 +0000341 bufneed = 4
342 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
343
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000344 def pollpacket(self, wait):
Chui Tey5d2af632002-05-26 13:36:41 +0000345 self._stage0()
Kurt B. Kaiser2d726df2007-08-22 21:33:27 +0000346 if len(self.buff) < self.bufneed:
Kurt B. Kaiserd6ab77d2004-01-21 19:21:11 +0000347 r, w, x = select.select([self.sock.fileno()], [], [], wait)
348 if len(r) == 0:
Chui Tey5d2af632002-05-26 13:36:41 +0000349 return None
350 try:
351 s = self.sock.recv(BUFSIZE)
352 except socket.error:
353 raise EOFError
354 if len(s) == 0:
355 raise EOFError
Kurt B. Kaiser2d726df2007-08-22 21:33:27 +0000356 self.buff += s
Chui Tey5d2af632002-05-26 13:36:41 +0000357 self._stage0()
358 return self._stage1()
359
360 def _stage0(self):
Kurt B. Kaiser2d726df2007-08-22 21:33:27 +0000361 if self.bufstate == 0 and len(self.buff) >= 4:
362 s = self.buff[:4]
363 self.buff = self.buff[4:]
Chui Tey5d2af632002-05-26 13:36:41 +0000364 self.bufneed = struct.unpack("<i", s)[0]
365 self.bufstate = 1
366
367 def _stage1(self):
Kurt B. Kaiser2d726df2007-08-22 21:33:27 +0000368 if self.bufstate == 1 and len(self.buff) >= self.bufneed:
369 packet = self.buff[:self.bufneed]
370 self.buff = self.buff[self.bufneed:]
Chui Tey5d2af632002-05-26 13:36:41 +0000371 self.bufneed = 4
372 self.bufstate = 0
373 return packet
374
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000375 def pollmessage(self, wait):
Chui Tey5d2af632002-05-26 13:36:41 +0000376 packet = self.pollpacket(wait)
377 if packet is None:
378 return None
379 try:
380 message = pickle.loads(packet)
Kurt B. Kaiserd6ab77d2004-01-21 19:21:11 +0000381 except pickle.UnpicklingError:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000382 print("-----------------------", file=sys.__stderr__)
383 print("cannot unpickle packet:", repr(packet), file=sys.__stderr__)
Chui Tey5d2af632002-05-26 13:36:41 +0000384 traceback.print_stack(file=sys.__stderr__)
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000385 print("-----------------------", file=sys.__stderr__)
Chui Tey5d2af632002-05-26 13:36:41 +0000386 raise
387 return message
388
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000389 def pollresponse(self, myseq, wait):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000390 """Handle messages received on the socket.
391
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000392 Some messages received may be asynchronous 'call' or 'queue' requests,
393 and some may be responses for other threads.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000394
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000395 'call' requests are passed to self.localcall() with the expectation of
396 immediate execution, during which time the socket is not serviced.
397
398 'queue' requests are used for tasks (which may block or hang) to be
399 processed in a different thread. These requests are fed into
400 request_queue by self.localcall(). Responses to queued requests are
401 taken from response_queue and sent across the link with the associated
402 sequence numbers. Messages in the queues are (sequence_number,
403 request/response) tuples and code using this module removing messages
404 from the request_queue is responsible for returning the correct
405 sequence number in the response_queue.
406
407 pollresponse() will loop until a response message with the myseq
408 sequence number is received, and will save other responses in
409 self.responses and notify the owning thread.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000410
411 """
Chui Tey5d2af632002-05-26 13:36:41 +0000412 while 1:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000413 # send queued response if there is one available
414 try:
415 qmsg = response_queue.get(0)
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000416 except queue.Empty:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000417 pass
418 else:
419 seq, response = qmsg
420 message = (seq, ('OK', response))
421 self.putmessage(message)
422 # poll for message on link
423 try:
424 message = self.pollmessage(wait)
425 if message is None: # socket not ready
426 return None
427 except EOFError:
428 self.handle_EOF()
Chui Tey5d2af632002-05-26 13:36:41 +0000429 return None
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000430 except AttributeError:
431 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000432 seq, resq = message
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000433 how = resq[0]
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000434 self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000435 # process or queue a request
436 if how in ("CALL", "QUEUE"):
Kurt B. Kaiserbc286132003-01-25 21:33:40 +0000437 self.debug("pollresponse:%d:localcall:call:" % seq)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000438 response = self.localcall(seq, resq)
Kurt B. Kaiserbc286132003-01-25 21:33:40 +0000439 self.debug("pollresponse:%d:localcall:response:%s"
440 % (seq, response))
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000441 if how == "CALL":
442 self.putmessage((seq, response))
443 elif how == "QUEUE":
444 # don't acknowledge the 'queue' request!
445 pass
Chui Tey5d2af632002-05-26 13:36:41 +0000446 continue
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000447 # return if completed message transaction
Chui Tey5d2af632002-05-26 13:36:41 +0000448 elif seq == myseq:
449 return resq
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000450 # must be a response for a different thread:
Chui Tey5d2af632002-05-26 13:36:41 +0000451 else:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000452 cv = self.cvars.get(seq, None)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000453 # response involving unknown sequence number is discarded,
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000454 # probably intended for prior incarnation of server
Chui Tey5d2af632002-05-26 13:36:41 +0000455 if cv is not None:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000456 cv.acquire()
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000457 self.responses[seq] = resq
Chui Tey5d2af632002-05-26 13:36:41 +0000458 cv.notify()
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000459 cv.release()
Chui Tey5d2af632002-05-26 13:36:41 +0000460 continue
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000461
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000462 def handle_EOF(self):
463 "action taken upon link being closed by peer"
464 self.EOFhook()
465 self.debug("handle_EOF")
466 for key in self.cvars:
467 cv = self.cvars[key]
468 cv.acquire()
469 self.responses[key] = ('EOF', None)
470 cv.notify()
471 cv.release()
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000472 # call our (possibly overridden) exit function
473 self.exithook()
474
475 def EOFhook(self):
476 "Classes using rpc client/server can override to augment EOF action"
477 pass
478
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000479#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000480
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000481class RemoteObject(object):
Chui Tey5d2af632002-05-26 13:36:41 +0000482 # Token mix-in class
483 pass
484
485def remoteref(obj):
486 oid = id(obj)
487 objecttable[oid] = obj
488 return RemoteProxy(oid)
489
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000490class RemoteProxy(object):
Chui Tey5d2af632002-05-26 13:36:41 +0000491
492 def __init__(self, oid):
493 self.oid = oid
494
Alexandre Vassalottice261952008-05-12 02:31:37 +0000495class RPCHandler(socketserver.BaseRequestHandler, SocketIO):
Chui Tey5d2af632002-05-26 13:36:41 +0000496
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000497 debugging = False
498 location = "#S" # Server
Chui Tey5d2af632002-05-26 13:36:41 +0000499
500 def __init__(self, sock, addr, svr):
501 svr.current_handler = self ## cgt xxx
502 SocketIO.__init__(self, sock)
Alexandre Vassalottice261952008-05-12 02:31:37 +0000503 socketserver.BaseRequestHandler.__init__(self, sock, addr, svr)
Chui Tey5d2af632002-05-26 13:36:41 +0000504
Chui Tey5d2af632002-05-26 13:36:41 +0000505 def handle(self):
Alexandre Vassalottice261952008-05-12 02:31:37 +0000506 "handle() method required by socketserver"
Chui Tey5d2af632002-05-26 13:36:41 +0000507 self.mainloop()
508
509 def get_remote_proxy(self, oid):
510 return RPCProxy(self, oid)
511
512class RPCClient(SocketIO):
513
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000514 debugging = False
515 location = "#C" # Client
516
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000517 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000518
519 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000520 self.listening_sock = socket.socket(family, type)
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000521 self.listening_sock.bind(address)
522 self.listening_sock.listen(1)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000523
524 def accept(self):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000525 working_sock, address = self.listening_sock.accept()
Kurt B. Kaiser74d93c82002-12-23 22:51:03 +0000526 if self.debugging:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000527 print("****** Connection request from ", address, file=sys.__stderr__)
Kurt B. Kaiser24d7e0c2003-06-05 23:51:29 +0000528 if address[0] == LOCALHOST:
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000529 SocketIO.__init__(self, working_sock)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000530 else:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000531 print("** Invalid host: ", address, file=sys.__stderr__)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000532 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000533
534 def get_remote_proxy(self, oid):
535 return RPCProxy(self, oid)
536
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000537class RPCProxy(object):
Chui Tey5d2af632002-05-26 13:36:41 +0000538
539 __methods = None
540 __attributes = None
541
542 def __init__(self, sockio, oid):
543 self.sockio = sockio
544 self.oid = oid
545
546 def __getattr__(self, name):
547 if self.__methods is None:
548 self.__getmethods()
549 if self.__methods.get(name):
550 return MethodProxy(self.sockio, self.oid, name)
551 if self.__attributes is None:
552 self.__getattributes()
Guido van Rossum811c4e02006-08-22 15:45:46 +0000553 if name in self.__attributes:
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000554 value = self.sockio.remotecall(self.oid, '__getattribute__',
555 (name,), {})
556 return value
557 else:
Kurt B. Kaiserad667422007-08-23 01:06:15 +0000558 raise AttributeError(name)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000559
Chui Tey5d2af632002-05-26 13:36:41 +0000560 def __getattributes(self):
561 self.__attributes = self.sockio.remotecall(self.oid,
562 "__attributes__", (), {})
563
564 def __getmethods(self):
565 self.__methods = self.sockio.remotecall(self.oid,
566 "__methods__", (), {})
567
568def _getmethods(obj, methods):
569 # Helper to get a list of methods from an object
570 # Adds names to dictionary argument 'methods'
571 for name in dir(obj):
572 attr = getattr(obj, name)
Guido van Rossumd59da4b2007-05-22 18:11:13 +0000573 if hasattr(attr, '__call__'):
Chui Tey5d2af632002-05-26 13:36:41 +0000574 methods[name] = 1
Guido van Rossum13257902007-06-07 23:15:56 +0000575 if isinstance(obj, type):
Chui Tey5d2af632002-05-26 13:36:41 +0000576 for super in obj.__bases__:
577 _getmethods(super, methods)
578
579def _getattributes(obj, attributes):
580 for name in dir(obj):
581 attr = getattr(obj, name)
Guido van Rossumd59da4b2007-05-22 18:11:13 +0000582 if not hasattr(attr, '__call__'):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000583 attributes[name] = 1
Chui Tey5d2af632002-05-26 13:36:41 +0000584
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000585class MethodProxy(object):
Chui Tey5d2af632002-05-26 13:36:41 +0000586
587 def __init__(self, sockio, oid, name):
588 self.sockio = sockio
589 self.oid = oid
590 self.name = name
591
592 def __call__(self, *args, **kwargs):
593 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
594 return value
595
Chui Tey5d2af632002-05-26 13:36:41 +0000596
Kurt B. Kaiser62685d32003-09-10 02:42:18 +0000597# XXX KBK 09Sep03 We need a proper unit test for this module. Previously
Benjamin Peterson8719ad52009-09-11 22:24:02 +0000598# existing test code was removed at Rev 1.27 (r34098).