blob: 301305ee90865d40574179c36fcd3345cf23dd20 [file] [log] [blame]
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00001"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
Alexandre Vassalottice261952008-05-12 02:31:37 +00008 | socketserver.BaseRequestHandler | | SocketIO |
Kurt B. Kaiserb4179362002-07-26 00:06:42 +00009 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
Kurt B. Kaisera00050f2003-05-08 20:26:55 +000031import os
Chui Tey5d2af632002-05-26 13:36:41 +000032import socket
33import select
Alexandre Vassalottice261952008-05-12 02:31:37 +000034import socketserver
Chui Tey5d2af632002-05-26 13:36:41 +000035import struct
Guido van Rossum99603b02007-07-20 00:22:32 +000036import pickle
Chui Tey5d2af632002-05-26 13:36:41 +000037import threading
Alexandre Vassalottif260e442008-05-11 19:59:59 +000038import queue
Chui Tey5d2af632002-05-26 13:36:41 +000039import traceback
Alexandre Vassalottif7fa63d2008-05-11 08:55:36 +000040import copyreg
Chui Tey5d2af632002-05-26 13:36:41 +000041import types
42import marshal
43
Kurt B. Kaisera00050f2003-05-08 20:26:55 +000044
Chui Tey5d2af632002-05-26 13:36:41 +000045def unpickle_code(ms):
46 co = marshal.loads(ms)
47 assert isinstance(co, types.CodeType)
48 return co
49
50def pickle_code(co):
51 assert isinstance(co, types.CodeType)
52 ms = marshal.dumps(co)
53 return unpickle_code, (ms,)
54
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000055# XXX KBK 24Aug02 function pickling capability not used in Idle
56# def unpickle_function(ms):
57# return ms
Chui Tey5d2af632002-05-26 13:36:41 +000058
Kurt B. Kaiseradc63842002-08-25 14:08:07 +000059# def pickle_function(fn):
60# assert isinstance(fn, type.FunctionType)
Walter Dörwald70a6b492004-02-12 17:35:32 +000061# return repr(fn)
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000062
Alexandre Vassalottif7fa63d2008-05-11 08:55:36 +000063copyreg.pickle(types.CodeType, pickle_code, unpickle_code)
64# copyreg.pickle(types.FunctionType, pickle_function, unpickle_function)
Chui Tey5d2af632002-05-26 13:36:41 +000065
66BUFSIZE = 8*1024
Kurt B. Kaiser24d7e0c2003-06-05 23:51:29 +000067LOCALHOST = '127.0.0.1'
Chui Tey5d2af632002-05-26 13:36:41 +000068
Alexandre Vassalottice261952008-05-12 02:31:37 +000069class RPCServer(socketserver.TCPServer):
Chui Tey5d2af632002-05-26 13:36:41 +000070
71 def __init__(self, addr, handlerclass=None):
72 if handlerclass is None:
73 handlerclass = RPCHandler
Alexandre Vassalottice261952008-05-12 02:31:37 +000074 socketserver.TCPServer.__init__(self, addr, handlerclass)
Chui Tey5d2af632002-05-26 13:36:41 +000075
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000076 def server_bind(self):
77 "Override TCPServer method, no bind() phase for connecting entity"
78 pass
Chui Tey5d2af632002-05-26 13:36:41 +000079
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000080 def server_activate(self):
81 """Override TCPServer method, connect() instead of listen()
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000082
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000083 Due to the reversed connection, self.server_address is actually the
84 address of the Idle Client to which we are connecting.
Chui Tey5d2af632002-05-26 13:36:41 +000085
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000086 """
87 self.socket.connect(self.server_address)
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +000088
Kurt B. Kaiserb4179362002-07-26 00:06:42 +000089 def get_request(self):
90 "Override TCPServer method, return already connected socket"
91 return self.socket, self.server_address
Chui Tey5d2af632002-05-26 13:36:41 +000092
Kurt B. Kaiser003091c2003-02-17 18:57:16 +000093 def handle_error(self, request, client_address):
Kurt B. Kaisere51529d2003-03-22 19:15:58 +000094 """Override TCPServer method
95
96 Error message goes to __stderr__. No error message if exiting
97 normally or socket raised EOF. Other exceptions not handled in
98 server code will cause os._exit.
99
100 """
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000101 try:
102 raise
103 except SystemExit:
104 raise
Kurt B. Kaisere51529d2003-03-22 19:15:58 +0000105 except:
Kurt B. Kaiser05293772003-03-22 20:11:14 +0000106 erf = sys.__stderr__
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000107 print('\n' + '-'*40, file=erf)
108 print('Unhandled server exception!', file=erf)
Amaury Forgeot d'Arcbed17102008-11-29 01:48:47 +0000109 print('Thread: %s' % threading.current_thread().name, file=erf)
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000110 print('Client Address: ', client_address, file=erf)
111 print('Request: ', repr(request), file=erf)
Kurt B. Kaisere51529d2003-03-22 19:15:58 +0000112 traceback.print_exc(file=erf)
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000113 print('\n*** Unrecoverable, server exiting!', file=erf)
114 print('-'*40, file=erf)
Kurt B. Kaiser05293772003-03-22 20:11:14 +0000115 os._exit(0)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000116
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000117#----------------- end class RPCServer --------------------
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000118
Chui Tey5d2af632002-05-26 13:36:41 +0000119objecttable = {}
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000120request_queue = queue.Queue(0)
121response_queue = queue.Queue(0)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000122
Chui Tey5d2af632002-05-26 13:36:41 +0000123
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000124class SocketIO(object):
Chui Tey5d2af632002-05-26 13:36:41 +0000125
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000126 nextseq = 0
127
Chui Tey5d2af632002-05-26 13:36:41 +0000128 def __init__(self, sock, objtable=None, debugging=None):
Benjamin Petersonb03ca4b2008-06-13 02:00:47 +0000129 self.sockthread = threading.current_thread()
Chui Tey5d2af632002-05-26 13:36:41 +0000130 if debugging is not None:
131 self.debugging = debugging
132 self.sock = sock
133 if objtable is None:
134 objtable = objecttable
135 self.objtable = objtable
Chui Tey5d2af632002-05-26 13:36:41 +0000136 self.responses = {}
137 self.cvars = {}
138
139 def close(self):
140 sock = self.sock
141 self.sock = None
142 if sock is not None:
143 sock.close()
144
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000145 def exithook(self):
146 "override for specific exit action"
147 os._exit()
148
Chui Tey5d2af632002-05-26 13:36:41 +0000149 def debug(self, *args):
150 if not self.debugging:
151 return
Amaury Forgeot d'Arcbed17102008-11-29 01:48:47 +0000152 s = self.location + " " + str(threading.current_thread().name)
Chui Tey5d2af632002-05-26 13:36:41 +0000153 for a in args:
154 s = s + " " + str(a)
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000155 print(s, file=sys.__stderr__)
Chui Tey5d2af632002-05-26 13:36:41 +0000156
157 def register(self, oid, object):
158 self.objtable[oid] = object
159
160 def unregister(self, oid):
161 try:
162 del self.objtable[oid]
163 except KeyError:
164 pass
165
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000166 def localcall(self, seq, request):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000167 self.debug("localcall:", request)
Chui Tey5d2af632002-05-26 13:36:41 +0000168 try:
169 how, (oid, methodname, args, kwargs) = request
170 except TypeError:
171 return ("ERROR", "Bad request format")
Guido van Rossum811c4e02006-08-22 15:45:46 +0000172 if oid not in self.objtable:
Walter Dörwald70a6b492004-02-12 17:35:32 +0000173 return ("ERROR", "Unknown object id: %r" % (oid,))
Chui Tey5d2af632002-05-26 13:36:41 +0000174 obj = self.objtable[oid]
175 if methodname == "__methods__":
176 methods = {}
177 _getmethods(obj, methods)
178 return ("OK", methods)
179 if methodname == "__attributes__":
180 attributes = {}
181 _getattributes(obj, attributes)
182 return ("OK", attributes)
183 if not hasattr(obj, methodname):
Walter Dörwald70a6b492004-02-12 17:35:32 +0000184 return ("ERROR", "Unsupported method name: %r" % (methodname,))
Chui Tey5d2af632002-05-26 13:36:41 +0000185 method = getattr(obj, methodname)
186 try:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000187 if how == 'CALL':
188 ret = method(*args, **kwargs)
189 if isinstance(ret, RemoteObject):
190 ret = remoteref(ret)
191 return ("OK", ret)
192 elif how == 'QUEUE':
193 request_queue.put((seq, (method, args, kwargs)))
194 return("QUEUED", None)
195 else:
196 return ("ERROR", "Unsupported message type: %s" % how)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000197 except SystemExit:
198 raise
Andrew Svetlov05bab932012-03-14 13:22:12 -0700199 except KeyboardInterrupt:
200 raise
Kurt B. Kaiser9ac783d2003-03-10 20:42:24 +0000201 except socket.error:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000202 raise
Andrew Svetlov05bab932012-03-14 13:22:12 -0700203 except Exception as ex:
204 return ("CALLEXC", ex)
Chui Tey5d2af632002-05-26 13:36:41 +0000205 except:
Kurt B. Kaisere852c192004-12-23 04:39:55 +0000206 msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\
207 " Object: %s \n Method: %s \n Args: %s\n"
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000208 print(msg % (oid, method, args), file=sys.__stderr__)
Kurt B. Kaiser86bc4642003-02-27 23:04:17 +0000209 traceback.print_exc(file=sys.__stderr__)
Kurt B. Kaiser8cd0def2003-01-31 05:06:43 +0000210 return ("EXCEPTION", None)
211
Chui Tey5d2af632002-05-26 13:36:41 +0000212 def remotecall(self, oid, methodname, args, kwargs):
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000213 self.debug("remotecall:asynccall: ", oid, methodname)
Chui Tey5d2af632002-05-26 13:36:41 +0000214 seq = self.asynccall(oid, methodname, args, kwargs)
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000215 return self.asyncreturn(seq)
Chui Tey5d2af632002-05-26 13:36:41 +0000216
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000217 def remotequeue(self, oid, methodname, args, kwargs):
218 self.debug("remotequeue:asyncqueue: ", oid, methodname)
219 seq = self.asyncqueue(oid, methodname, args, kwargs)
220 return self.asyncreturn(seq)
221
Chui Tey5d2af632002-05-26 13:36:41 +0000222 def asynccall(self, oid, methodname, args, kwargs):
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000223 request = ("CALL", (oid, methodname, args, kwargs))
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000224 seq = self.newseq()
Benjamin Petersonb03ca4b2008-06-13 02:00:47 +0000225 if threading.current_thread() != self.sockthread:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000226 cvar = threading.Condition()
227 self.cvars[seq] = cvar
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000228 self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
229 self.putmessage((seq, request))
Chui Tey5d2af632002-05-26 13:36:41 +0000230 return seq
231
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000232 def asyncqueue(self, oid, methodname, args, kwargs):
233 request = ("QUEUE", (oid, methodname, args, kwargs))
234 seq = self.newseq()
Benjamin Petersonb03ca4b2008-06-13 02:00:47 +0000235 if threading.current_thread() != self.sockthread:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000236 cvar = threading.Condition()
237 self.cvars[seq] = cvar
238 self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
239 self.putmessage((seq, request))
240 return seq
241
Chui Tey5d2af632002-05-26 13:36:41 +0000242 def asyncreturn(self, seq):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000243 self.debug("asyncreturn:%d:call getresponse(): " % seq)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000244 response = self.getresponse(seq, wait=0.05)
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000245 self.debug(("asyncreturn:%d:response: " % seq), response)
Chui Tey5d2af632002-05-26 13:36:41 +0000246 return self.decoderesponse(response)
247
248 def decoderesponse(self, response):
249 how, what = response
250 if how == "OK":
251 return what
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000252 if how == "QUEUED":
253 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000254 if how == "EXCEPTION":
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000255 self.debug("decoderesponse: EXCEPTION")
256 return None
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000257 if how == "EOF":
258 self.debug("decoderesponse: EOF")
259 self.decode_interrupthook()
260 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000261 if how == "ERROR":
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000262 self.debug("decoderesponse: Internal ERROR:", what)
Kurt B. Kaiserad667422007-08-23 01:06:15 +0000263 raise RuntimeError(what)
Andrew Svetlov05bab932012-03-14 13:22:12 -0700264 if how == "CALLEXC":
265 self.debug("decoderesponse: Call Exception:", what)
266 raise what
Kurt B. Kaiserad667422007-08-23 01:06:15 +0000267 raise SystemError(how, what)
Chui Tey5d2af632002-05-26 13:36:41 +0000268
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000269 def decode_interrupthook(self):
270 ""
271 raise EOFError
272
Chui Tey5d2af632002-05-26 13:36:41 +0000273 def mainloop(self):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000274 """Listen on socket until I/O not ready or EOF
275
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000276 pollresponse() will loop looking for seq number None, which
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000277 never comes, and exit on EOFError.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000278
279 """
Chui Tey5d2af632002-05-26 13:36:41 +0000280 try:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000281 self.getresponse(myseq=None, wait=0.05)
Chui Tey5d2af632002-05-26 13:36:41 +0000282 except EOFError:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000283 self.debug("mainloop:return")
284 return
Chui Tey5d2af632002-05-26 13:36:41 +0000285
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000286 def getresponse(self, myseq, wait):
287 response = self._getresponse(myseq, wait)
Chui Tey5d2af632002-05-26 13:36:41 +0000288 if response is not None:
289 how, what = response
290 if how == "OK":
291 response = how, self._proxify(what)
292 return response
293
294 def _proxify(self, obj):
295 if isinstance(obj, RemoteProxy):
296 return RPCProxy(self, obj.oid)
Guido van Rossum13257902007-06-07 23:15:56 +0000297 if isinstance(obj, list):
Kurt B. Kaiser66aaf742007-08-09 18:00:23 +0000298 return list(map(self._proxify, obj))
Chui Tey5d2af632002-05-26 13:36:41 +0000299 # XXX Check for other types -- not currently needed
300 return obj
301
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000302 def _getresponse(self, myseq, wait):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000303 self.debug("_getresponse:myseq:", myseq)
Benjamin Petersonb03ca4b2008-06-13 02:00:47 +0000304 if threading.current_thread() is self.sockthread:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000305 # this thread does all reading of requests or responses
Chui Tey5d2af632002-05-26 13:36:41 +0000306 while 1:
Kurt B. Kaiser94afd302003-03-12 20:52:00 +0000307 response = self.pollresponse(myseq, wait)
Chui Tey5d2af632002-05-26 13:36:41 +0000308 if response is not None:
309 return response
310 else:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000311 # wait for notification from socket handling thread
312 cvar = self.cvars[myseq]
313 cvar.acquire()
Guido van Rossum811c4e02006-08-22 15:45:46 +0000314 while myseq not in self.responses:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000315 cvar.wait()
Chui Tey5d2af632002-05-26 13:36:41 +0000316 response = self.responses[myseq]
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000317 self.debug("_getresponse:%s: thread woke up: response: %s" %
318 (myseq, response))
Chui Tey5d2af632002-05-26 13:36:41 +0000319 del self.responses[myseq]
320 del self.cvars[myseq]
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000321 cvar.release()
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000322 return response
Chui Tey5d2af632002-05-26 13:36:41 +0000323
324 def newseq(self):
325 self.nextseq = seq = self.nextseq + 2
326 return seq
327
328 def putmessage(self, message):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000329 self.debug("putmessage:%d:" % message[0])
Chui Tey5d2af632002-05-26 13:36:41 +0000330 try:
331 s = pickle.dumps(message)
Kurt B. Kaiserd6ab77d2004-01-21 19:21:11 +0000332 except pickle.PicklingError:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000333 print("Cannot pickle:", repr(message), file=sys.__stderr__)
Chui Tey5d2af632002-05-26 13:36:41 +0000334 raise
335 s = struct.pack("<i", len(s)) + s
336 while len(s) > 0:
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000337 try:
Kurt B. Kaiserd6ab77d2004-01-21 19:21:11 +0000338 r, w, x = select.select([], [self.sock], [])
339 n = self.sock.send(s[:BUFSIZE])
Kurt B. Kaiser935ea9a2005-05-10 03:44:24 +0000340 except (AttributeError, TypeError):
Kurt B. Kaiserad667422007-08-23 01:06:15 +0000341 raise IOError("socket no longer exists")
Kurt B. Kaiser935ea9a2005-05-10 03:44:24 +0000342 except socket.error:
343 raise
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000344 else:
345 s = s[n:]
Chui Tey5d2af632002-05-26 13:36:41 +0000346
Kurt B. Kaiser2d726df2007-08-22 21:33:27 +0000347 buff = b''
Chui Tey5d2af632002-05-26 13:36:41 +0000348 bufneed = 4
349 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
350
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000351 def pollpacket(self, wait):
Chui Tey5d2af632002-05-26 13:36:41 +0000352 self._stage0()
Kurt B. Kaiser2d726df2007-08-22 21:33:27 +0000353 if len(self.buff) < self.bufneed:
Kurt B. Kaiserd6ab77d2004-01-21 19:21:11 +0000354 r, w, x = select.select([self.sock.fileno()], [], [], wait)
355 if len(r) == 0:
Chui Tey5d2af632002-05-26 13:36:41 +0000356 return None
357 try:
358 s = self.sock.recv(BUFSIZE)
359 except socket.error:
360 raise EOFError
361 if len(s) == 0:
362 raise EOFError
Kurt B. Kaiser2d726df2007-08-22 21:33:27 +0000363 self.buff += s
Chui Tey5d2af632002-05-26 13:36:41 +0000364 self._stage0()
365 return self._stage1()
366
367 def _stage0(self):
Kurt B. Kaiser2d726df2007-08-22 21:33:27 +0000368 if self.bufstate == 0 and len(self.buff) >= 4:
369 s = self.buff[:4]
370 self.buff = self.buff[4:]
Chui Tey5d2af632002-05-26 13:36:41 +0000371 self.bufneed = struct.unpack("<i", s)[0]
372 self.bufstate = 1
373
374 def _stage1(self):
Kurt B. Kaiser2d726df2007-08-22 21:33:27 +0000375 if self.bufstate == 1 and len(self.buff) >= self.bufneed:
376 packet = self.buff[:self.bufneed]
377 self.buff = self.buff[self.bufneed:]
Chui Tey5d2af632002-05-26 13:36:41 +0000378 self.bufneed = 4
379 self.bufstate = 0
380 return packet
381
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000382 def pollmessage(self, wait):
Chui Tey5d2af632002-05-26 13:36:41 +0000383 packet = self.pollpacket(wait)
384 if packet is None:
385 return None
386 try:
387 message = pickle.loads(packet)
Kurt B. Kaiserd6ab77d2004-01-21 19:21:11 +0000388 except pickle.UnpicklingError:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000389 print("-----------------------", file=sys.__stderr__)
390 print("cannot unpickle packet:", repr(packet), file=sys.__stderr__)
Chui Tey5d2af632002-05-26 13:36:41 +0000391 traceback.print_stack(file=sys.__stderr__)
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000392 print("-----------------------", file=sys.__stderr__)
Chui Tey5d2af632002-05-26 13:36:41 +0000393 raise
394 return message
395
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000396 def pollresponse(self, myseq, wait):
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000397 """Handle messages received on the socket.
398
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000399 Some messages received may be asynchronous 'call' or 'queue' requests,
400 and some may be responses for other threads.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000401
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000402 'call' requests are passed to self.localcall() with the expectation of
403 immediate execution, during which time the socket is not serviced.
404
405 'queue' requests are used for tasks (which may block or hang) to be
406 processed in a different thread. These requests are fed into
407 request_queue by self.localcall(). Responses to queued requests are
408 taken from response_queue and sent across the link with the associated
409 sequence numbers. Messages in the queues are (sequence_number,
410 request/response) tuples and code using this module removing messages
411 from the request_queue is responsible for returning the correct
412 sequence number in the response_queue.
413
414 pollresponse() will loop until a response message with the myseq
415 sequence number is received, and will save other responses in
416 self.responses and notify the owning thread.
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000417
418 """
Chui Tey5d2af632002-05-26 13:36:41 +0000419 while 1:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000420 # send queued response if there is one available
421 try:
422 qmsg = response_queue.get(0)
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000423 except queue.Empty:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000424 pass
425 else:
426 seq, response = qmsg
427 message = (seq, ('OK', response))
428 self.putmessage(message)
429 # poll for message on link
430 try:
431 message = self.pollmessage(wait)
432 if message is None: # socket not ready
433 return None
434 except EOFError:
435 self.handle_EOF()
Chui Tey5d2af632002-05-26 13:36:41 +0000436 return None
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000437 except AttributeError:
438 return None
Chui Tey5d2af632002-05-26 13:36:41 +0000439 seq, resq = message
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000440 how = resq[0]
Kurt B. Kaiser0a0e6c32003-01-25 03:26:35 +0000441 self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000442 # process or queue a request
443 if how in ("CALL", "QUEUE"):
Kurt B. Kaiserbc286132003-01-25 21:33:40 +0000444 self.debug("pollresponse:%d:localcall:call:" % seq)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000445 response = self.localcall(seq, resq)
Kurt B. Kaiserbc286132003-01-25 21:33:40 +0000446 self.debug("pollresponse:%d:localcall:response:%s"
447 % (seq, response))
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000448 if how == "CALL":
449 self.putmessage((seq, response))
450 elif how == "QUEUE":
451 # don't acknowledge the 'queue' request!
452 pass
Chui Tey5d2af632002-05-26 13:36:41 +0000453 continue
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000454 # return if completed message transaction
Chui Tey5d2af632002-05-26 13:36:41 +0000455 elif seq == myseq:
456 return resq
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000457 # must be a response for a different thread:
Chui Tey5d2af632002-05-26 13:36:41 +0000458 else:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000459 cv = self.cvars.get(seq, None)
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000460 # response involving unknown sequence number is discarded,
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000461 # probably intended for prior incarnation of server
Chui Tey5d2af632002-05-26 13:36:41 +0000462 if cv is not None:
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000463 cv.acquire()
Kurt B. Kaiser003091c2003-02-17 18:57:16 +0000464 self.responses[seq] = resq
Chui Tey5d2af632002-05-26 13:36:41 +0000465 cv.notify()
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000466 cv.release()
Chui Tey5d2af632002-05-26 13:36:41 +0000467 continue
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000468
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000469 def handle_EOF(self):
470 "action taken upon link being closed by peer"
471 self.EOFhook()
472 self.debug("handle_EOF")
473 for key in self.cvars:
474 cv = self.cvars[key]
475 cv.acquire()
476 self.responses[key] = ('EOF', None)
477 cv.notify()
478 cv.release()
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000479 # call our (possibly overridden) exit function
480 self.exithook()
481
482 def EOFhook(self):
483 "Classes using rpc client/server can override to augment EOF action"
484 pass
485
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000486#----------------- end class SocketIO --------------------
Chui Tey5d2af632002-05-26 13:36:41 +0000487
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000488class RemoteObject(object):
Chui Tey5d2af632002-05-26 13:36:41 +0000489 # Token mix-in class
490 pass
491
492def remoteref(obj):
493 oid = id(obj)
494 objecttable[oid] = obj
495 return RemoteProxy(oid)
496
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000497class RemoteProxy(object):
Chui Tey5d2af632002-05-26 13:36:41 +0000498
499 def __init__(self, oid):
500 self.oid = oid
501
Alexandre Vassalottice261952008-05-12 02:31:37 +0000502class RPCHandler(socketserver.BaseRequestHandler, SocketIO):
Chui Tey5d2af632002-05-26 13:36:41 +0000503
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000504 debugging = False
505 location = "#S" # Server
Chui Tey5d2af632002-05-26 13:36:41 +0000506
507 def __init__(self, sock, addr, svr):
508 svr.current_handler = self ## cgt xxx
509 SocketIO.__init__(self, sock)
Alexandre Vassalottice261952008-05-12 02:31:37 +0000510 socketserver.BaseRequestHandler.__init__(self, sock, addr, svr)
Chui Tey5d2af632002-05-26 13:36:41 +0000511
Chui Tey5d2af632002-05-26 13:36:41 +0000512 def handle(self):
Alexandre Vassalottice261952008-05-12 02:31:37 +0000513 "handle() method required by socketserver"
Chui Tey5d2af632002-05-26 13:36:41 +0000514 self.mainloop()
515
516 def get_remote_proxy(self, oid):
517 return RPCProxy(self, oid)
518
519class RPCClient(SocketIO):
520
Kurt B. Kaiser0930c432002-12-06 21:45:24 +0000521 debugging = False
522 location = "#C" # Client
523
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000524 nextseq = 1 # Requests coming from the client are odd numbered
Chui Tey5d2af632002-05-26 13:36:41 +0000525
526 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000527 self.listening_sock = socket.socket(family, type)
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000528 self.listening_sock.bind(address)
529 self.listening_sock.listen(1)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000530
531 def accept(self):
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000532 working_sock, address = self.listening_sock.accept()
Kurt B. Kaiser74d93c82002-12-23 22:51:03 +0000533 if self.debugging:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000534 print("****** Connection request from ", address, file=sys.__stderr__)
Kurt B. Kaiser24d7e0c2003-06-05 23:51:29 +0000535 if address[0] == LOCALHOST:
Kurt B. Kaiseradc63842002-08-25 14:08:07 +0000536 SocketIO.__init__(self, working_sock)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000537 else:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000538 print("** Invalid host: ", address, file=sys.__stderr__)
Kurt B. Kaiserb4179362002-07-26 00:06:42 +0000539 raise socket.error
Chui Tey5d2af632002-05-26 13:36:41 +0000540
541 def get_remote_proxy(self, oid):
542 return RPCProxy(self, oid)
543
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000544class RPCProxy(object):
Chui Tey5d2af632002-05-26 13:36:41 +0000545
546 __methods = None
547 __attributes = None
548
549 def __init__(self, sockio, oid):
550 self.sockio = sockio
551 self.oid = oid
552
553 def __getattr__(self, name):
554 if self.__methods is None:
555 self.__getmethods()
556 if self.__methods.get(name):
557 return MethodProxy(self.sockio, self.oid, name)
558 if self.__attributes is None:
559 self.__getattributes()
Guido van Rossum811c4e02006-08-22 15:45:46 +0000560 if name in self.__attributes:
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000561 value = self.sockio.remotecall(self.oid, '__getattribute__',
562 (name,), {})
563 return value
564 else:
Kurt B. Kaiserad667422007-08-23 01:06:15 +0000565 raise AttributeError(name)
Kurt B. Kaisera00050f2003-05-08 20:26:55 +0000566
Chui Tey5d2af632002-05-26 13:36:41 +0000567 def __getattributes(self):
568 self.__attributes = self.sockio.remotecall(self.oid,
569 "__attributes__", (), {})
570
571 def __getmethods(self):
572 self.__methods = self.sockio.remotecall(self.oid,
573 "__methods__", (), {})
574
575def _getmethods(obj, methods):
576 # Helper to get a list of methods from an object
577 # Adds names to dictionary argument 'methods'
578 for name in dir(obj):
579 attr = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200580 if callable(attr):
Chui Tey5d2af632002-05-26 13:36:41 +0000581 methods[name] = 1
Guido van Rossum13257902007-06-07 23:15:56 +0000582 if isinstance(obj, type):
Chui Tey5d2af632002-05-26 13:36:41 +0000583 for super in obj.__bases__:
584 _getmethods(super, methods)
585
586def _getattributes(obj, attributes):
587 for name in dir(obj):
588 attr = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200589 if not callable(attr):
Kurt B. Kaiser6655e4b2002-12-31 16:03:23 +0000590 attributes[name] = 1
Chui Tey5d2af632002-05-26 13:36:41 +0000591
Kurt B. Kaiserdcba6622004-12-21 22:10:32 +0000592class MethodProxy(object):
Chui Tey5d2af632002-05-26 13:36:41 +0000593
594 def __init__(self, sockio, oid, name):
595 self.sockio = sockio
596 self.oid = oid
597 self.name = name
598
599 def __call__(self, *args, **kwargs):
600 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
601 return value
602
Chui Tey5d2af632002-05-26 13:36:41 +0000603
Kurt B. Kaiser62685d32003-09-10 02:42:18 +0000604# XXX KBK 09Sep03 We need a proper unit test for this module. Previously
Benjamin Peterson8719ad52009-09-11 22:24:02 +0000605# existing test code was removed at Rev 1.27 (r34098).