blob: 776656ea176a2a6868eb4b66273ea6840f589a3d [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
Charles-François Natalic8ce7152012-04-17 18:45:57 +020022from time import time as _time
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010023from traceback import format_exc
24
25from . import connection
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010026from . import context
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from . import pool
28from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010029from . import reduction
30from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010031from . import get_context
Benjamin Petersone711caf2008-06-11 16:44:04 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
Benjamin Petersone711caf2008-06-11 16:44:04 +000034# Register some things for pickling
35#
36
37def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000038 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010039reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000040
41view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000042if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000043 def rebuild_as_list(obj):
44 return list, (list(obj),)
45 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010046 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000047
48#
49# Type for identifying shared objects
50#
51
52class Token(object):
53 '''
54 Type to uniquely indentify a shared object
55 '''
56 __slots__ = ('typeid', 'address', 'id')
57
58 def __init__(self, typeid, address, id):
59 (self.typeid, self.address, self.id) = (typeid, address, id)
60
61 def __getstate__(self):
62 return (self.typeid, self.address, self.id)
63
64 def __setstate__(self, state):
65 (self.typeid, self.address, self.id) = state
66
67 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030068 return '%s(typeid=%r, address=%r, id=%r)' % \
69 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71#
72# Function for communication with a manager's server process
73#
74
75def dispatch(c, id, methodname, args=(), kwds={}):
76 '''
77 Send a message to manager using connection `c` and return response
78 '''
79 c.send((id, methodname, args, kwds))
80 kind, result = c.recv()
81 if kind == '#RETURN':
82 return result
83 raise convert_to_error(kind, result)
84
85def convert_to_error(kind, result):
86 if kind == '#ERROR':
87 return result
88 elif kind == '#TRACEBACK':
89 assert type(result) is str
90 return RemoteError(result)
91 elif kind == '#UNSERIALIZABLE':
92 assert type(result) is str
93 return RemoteError('Unserializable message: %s\n' % result)
94 else:
95 return ValueError('Unrecognized message type')
96
97class RemoteError(Exception):
98 def __str__(self):
99 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
100
101#
102# Functions for finding the method names of an object
103#
104
105def all_methods(obj):
106 '''
107 Return a list of names of methods of `obj`
108 '''
109 temp = []
110 for name in dir(obj):
111 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200112 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000113 temp.append(name)
114 return temp
115
116def public_methods(obj):
117 '''
118 Return a list of names of methods of `obj` which do not start with '_'
119 '''
120 return [name for name in all_methods(obj) if name[0] != '_']
121
122#
123# Server which is run in a process controlled by a manager
124#
125
126class Server(object):
127 '''
128 Server class which runs in a process controlled by a manager object
129 '''
130 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
131 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
132
133 def __init__(self, registry, address, authkey, serializer):
134 assert isinstance(authkey, bytes)
135 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100136 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000137 Listener, Client = listener_client[serializer]
138
139 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100140 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000141 self.address = self.listener.address
142
Jesse Noller63b3a972009-01-21 02:15:48 +0000143 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000144 self.id_to_refcount = {}
145 self.mutex = threading.RLock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146
147 def serve_forever(self):
148 '''
149 Run the server forever
150 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100151 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100152 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100154 accepter = threading.Thread(target=self.accepter)
155 accepter.daemon = True
156 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000157 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100158 while not self.stop_event.is_set():
159 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160 except (KeyboardInterrupt, SystemExit):
161 pass
162 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100163 if sys.stdout != sys.__stdout__:
164 util.debug('resetting stdout, stderr')
165 sys.stdout = sys.__stdout__
166 sys.stderr = sys.__stderr__
167 sys.exit(0)
168
169 def accepter(self):
170 while True:
171 try:
172 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200173 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100174 continue
175 t = threading.Thread(target=self.handle_request, args=(c,))
176 t.daemon = True
177 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178
179 def handle_request(self, c):
180 '''
181 Handle a new connection
182 '''
183 funcname = result = request = None
184 try:
185 connection.deliver_challenge(c, self.authkey)
186 connection.answer_challenge(c, self.authkey)
187 request = c.recv()
188 ignore, funcname, args, kwds = request
189 assert funcname in self.public, '%r unrecognized' % funcname
190 func = getattr(self, funcname)
191 except Exception:
192 msg = ('#TRACEBACK', format_exc())
193 else:
194 try:
195 result = func(c, *args, **kwds)
196 except Exception:
197 msg = ('#TRACEBACK', format_exc())
198 else:
199 msg = ('#RETURN', result)
200 try:
201 c.send(msg)
202 except Exception as e:
203 try:
204 c.send(('#TRACEBACK', format_exc()))
205 except Exception:
206 pass
207 util.info('Failure to send message: %r', msg)
208 util.info(' ... request was %r', request)
209 util.info(' ... exception was %r', e)
210
211 c.close()
212
213 def serve_client(self, conn):
214 '''
215 Handle requests from the proxies in a particular process/thread
216 '''
217 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000218 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219
220 recv = conn.recv
221 send = conn.send
222 id_to_obj = self.id_to_obj
223
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100224 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 try:
227 methodname = obj = None
228 request = recv()
229 ident, methodname, args, kwds = request
230 obj, exposed, gettypeid = id_to_obj[ident]
231
232 if methodname not in exposed:
233 raise AttributeError(
234 'method %r of %r object is not in exposed=%r' %
235 (methodname, type(obj), exposed)
236 )
237
238 function = getattr(obj, methodname)
239
240 try:
241 res = function(*args, **kwds)
242 except Exception as e:
243 msg = ('#ERROR', e)
244 else:
245 typeid = gettypeid and gettypeid.get(methodname, None)
246 if typeid:
247 rident, rexposed = self.create(conn, typeid, res)
248 token = Token(typeid, self.address, rident)
249 msg = ('#PROXY', (rexposed, token))
250 else:
251 msg = ('#RETURN', res)
252
253 except AttributeError:
254 if methodname is None:
255 msg = ('#TRACEBACK', format_exc())
256 else:
257 try:
258 fallback_func = self.fallback_mapping[methodname]
259 result = fallback_func(
260 self, conn, ident, obj, *args, **kwds
261 )
262 msg = ('#RETURN', result)
263 except Exception:
264 msg = ('#TRACEBACK', format_exc())
265
266 except EOFError:
267 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000268 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269 sys.exit(0)
270
271 except Exception:
272 msg = ('#TRACEBACK', format_exc())
273
274 try:
275 try:
276 send(msg)
277 except Exception as e:
278 send(('#UNSERIALIZABLE', repr(msg)))
279 except Exception as e:
280 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000281 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282 util.info(' ... message was %r', msg)
283 util.info(' ... exception was %r', e)
284 conn.close()
285 sys.exit(1)
286
287 def fallback_getvalue(self, conn, ident, obj):
288 return obj
289
290 def fallback_str(self, conn, ident, obj):
291 return str(obj)
292
293 def fallback_repr(self, conn, ident, obj):
294 return repr(obj)
295
296 fallback_mapping = {
297 '__str__':fallback_str,
298 '__repr__':fallback_repr,
299 '#GETVALUE':fallback_getvalue
300 }
301
302 def dummy(self, c):
303 pass
304
305 def debug_info(self, c):
306 '''
307 Return some info --- useful to spot problems with refcounting
308 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100309 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000310 result = []
311 keys = list(self.id_to_obj.keys())
312 keys.sort()
313 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000314 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000315 result.append(' %s: refcount=%s\n %s' %
316 (ident, self.id_to_refcount[ident],
317 str(self.id_to_obj[ident][0])[:75]))
318 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000319
320 def number_of_objects(self, c):
321 '''
322 Number of shared objects
323 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000324 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 def shutdown(self, c):
327 '''
328 Shutdown this process
329 '''
330 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100331 util.debug('manager received shutdown message')
332 c.send(('#RETURN', None))
333 except:
334 import traceback
335 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100337 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000338
339 def create(self, c, typeid, *args, **kwds):
340 '''
341 Create a new shared object and return its id
342 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100343 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000344 callable, exposed, method_to_typeid, proxytype = \
345 self.registry[typeid]
346
347 if callable is None:
348 assert len(args) == 1 and not kwds
349 obj = args[0]
350 else:
351 obj = callable(*args, **kwds)
352
353 if exposed is None:
354 exposed = public_methods(obj)
355 if method_to_typeid is not None:
356 assert type(method_to_typeid) is dict
357 exposed = list(exposed) + list(method_to_typeid)
358
359 ident = '%x' % id(obj) # convert to string because xmlrpclib
360 # only has 32 bit signed integers
361 util.debug('%r callable returned object with id %r', typeid, ident)
362
363 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
364 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000365 self.id_to_refcount[ident] = 0
366 # increment the reference count immediately, to avoid
367 # this object being garbage collected before a Proxy
368 # object for it can be created. The caller of create()
369 # is responsible for doing a decref once the Proxy object
370 # has been created.
371 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000372 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000373
374 def get_methods(self, c, token):
375 '''
376 Return the methods of the shared object indicated by token
377 '''
378 return tuple(self.id_to_obj[token.id][1])
379
380 def accept_connection(self, c, name):
381 '''
382 Spawn a new thread to serve this connection
383 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000384 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000385 c.send(('#RETURN', None))
386 self.serve_client(c)
387
388 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100389 with self.mutex:
Jesse Noller824f4f32008-09-02 19:12:20 +0000390 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000391
392 def decref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100393 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000394 assert self.id_to_refcount[ident] >= 1
395 self.id_to_refcount[ident] -= 1
396 if self.id_to_refcount[ident] == 0:
397 del self.id_to_obj[ident], self.id_to_refcount[ident]
Benjamin Peterson4ac9ce42009-10-04 14:49:41 +0000398 util.debug('disposing of obj with id %r', ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000399
400#
401# Class to represent state of a manager
402#
403
404class State(object):
405 __slots__ = ['value']
406 INITIAL = 0
407 STARTED = 1
408 SHUTDOWN = 2
409
410#
411# Mapping from serializer name to Listener and Client types
412#
413
414listener_client = {
415 'pickle' : (connection.Listener, connection.Client),
416 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
417 }
418
419#
420# Definition of BaseManager
421#
422
423class BaseManager(object):
424 '''
425 Base class for managers
426 '''
427 _registry = {}
428 _Server = Server
429
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100430 def __init__(self, address=None, authkey=None, serializer='pickle',
431 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000432 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100433 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000434 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100435 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000436 self._state = State()
437 self._state.value = State.INITIAL
438 self._serializer = serializer
439 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100440 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441
Benjamin Petersone711caf2008-06-11 16:44:04 +0000442 def get_server(self):
443 '''
444 Return server object with serve_forever() method and address attribute
445 '''
446 assert self._state.value == State.INITIAL
447 return Server(self._registry, self._address,
448 self._authkey, self._serializer)
449
450 def connect(self):
451 '''
452 Connect manager object to the server process
453 '''
454 Listener, Client = listener_client[self._serializer]
455 conn = Client(self._address, authkey=self._authkey)
456 dispatch(conn, None, 'dummy')
457 self._state.value = State.STARTED
458
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000459 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000460 '''
461 Spawn a server process for this manager object
462 '''
463 assert self._state.value == State.INITIAL
464
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200465 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000466 raise TypeError('initializer must be a callable')
467
Benjamin Petersone711caf2008-06-11 16:44:04 +0000468 # pipe over which we will retrieve address of server
469 reader, writer = connection.Pipe(duplex=False)
470
471 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100472 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000473 target=type(self)._run_server,
474 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000475 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000476 )
477 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000478 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000479 self._process.start()
480
481 # get address of server
482 writer.close()
483 self._address = reader.recv()
484 reader.close()
485
486 # register a finalizer
487 self._state.value = State.STARTED
488 self.shutdown = util.Finalize(
489 self, type(self)._finalize_manager,
490 args=(self._process, self._address, self._authkey,
491 self._state, self._Client),
492 exitpriority=0
493 )
494
495 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000496 def _run_server(cls, registry, address, authkey, serializer, writer,
497 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000498 '''
499 Create a server, report its address and run it
500 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000501 if initializer is not None:
502 initializer(*initargs)
503
Benjamin Petersone711caf2008-06-11 16:44:04 +0000504 # create server
505 server = cls._Server(registry, address, authkey, serializer)
506
507 # inform parent process of the server's address
508 writer.send(server.address)
509 writer.close()
510
511 # run the manager
512 util.info('manager serving at %r', server.address)
513 server.serve_forever()
514
515 def _create(self, typeid, *args, **kwds):
516 '''
517 Create a new shared object; return the token and exposed tuple
518 '''
519 assert self._state.value == State.STARTED, 'server not yet started'
520 conn = self._Client(self._address, authkey=self._authkey)
521 try:
522 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
523 finally:
524 conn.close()
525 return Token(typeid, self._address, id), exposed
526
527 def join(self, timeout=None):
528 '''
529 Join the manager process (if it has been spawned)
530 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100531 if self._process is not None:
532 self._process.join(timeout)
533 if not self._process.is_alive():
534 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000535
536 def _debug_info(self):
537 '''
538 Return some info about the servers shared objects and connections
539 '''
540 conn = self._Client(self._address, authkey=self._authkey)
541 try:
542 return dispatch(conn, None, 'debug_info')
543 finally:
544 conn.close()
545
546 def _number_of_objects(self):
547 '''
548 Return the number of shared objects
549 '''
550 conn = self._Client(self._address, authkey=self._authkey)
551 try:
552 return dispatch(conn, None, 'number_of_objects')
553 finally:
554 conn.close()
555
556 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100557 if self._state.value == State.INITIAL:
558 self.start()
559 assert self._state.value == State.STARTED
Benjamin Petersone711caf2008-06-11 16:44:04 +0000560 return self
561
562 def __exit__(self, exc_type, exc_val, exc_tb):
563 self.shutdown()
564
565 @staticmethod
566 def _finalize_manager(process, address, authkey, state, _Client):
567 '''
568 Shutdown the manager process; will be registered as a finalizer
569 '''
570 if process.is_alive():
571 util.info('sending shutdown message to manager')
572 try:
573 conn = _Client(address, authkey=authkey)
574 try:
575 dispatch(conn, None, 'shutdown')
576 finally:
577 conn.close()
578 except Exception:
579 pass
580
Richard Oudkerk3049f122012-06-15 20:08:29 +0100581 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000582 if process.is_alive():
583 util.info('manager still alive')
584 if hasattr(process, 'terminate'):
585 util.info('trying to `terminate()` manager process')
586 process.terminate()
587 process.join(timeout=0.1)
588 if process.is_alive():
589 util.info('manager still alive after terminate')
590
591 state.value = State.SHUTDOWN
592 try:
593 del BaseProxy._address_to_local[address]
594 except KeyError:
595 pass
596
597 address = property(lambda self: self._address)
598
599 @classmethod
600 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
601 method_to_typeid=None, create_method=True):
602 '''
603 Register a typeid with the manager type
604 '''
605 if '_registry' not in cls.__dict__:
606 cls._registry = cls._registry.copy()
607
608 if proxytype is None:
609 proxytype = AutoProxy
610
611 exposed = exposed or getattr(proxytype, '_exposed_', None)
612
613 method_to_typeid = method_to_typeid or \
614 getattr(proxytype, '_method_to_typeid_', None)
615
616 if method_to_typeid:
617 for key, value in list(method_to_typeid.items()):
618 assert type(key) is str, '%r is not a string' % key
619 assert type(value) is str, '%r is not a string' % value
620
621 cls._registry[typeid] = (
622 callable, exposed, method_to_typeid, proxytype
623 )
624
625 if create_method:
626 def temp(self, *args, **kwds):
627 util.debug('requesting creation of a shared %r object', typeid)
628 token, exp = self._create(typeid, *args, **kwds)
629 proxy = proxytype(
630 token, self._serializer, manager=self,
631 authkey=self._authkey, exposed=exp
632 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000633 conn = self._Client(token.address, authkey=self._authkey)
634 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000635 return proxy
636 temp.__name__ = typeid
637 setattr(cls, typeid, temp)
638
639#
640# Subclass of set which get cleared after a fork
641#
642
643class ProcessLocalSet(set):
644 def __init__(self):
645 util.register_after_fork(self, lambda obj: obj.clear())
646 def __reduce__(self):
647 return type(self), ()
648
649#
650# Definition of BaseProxy
651#
652
653class BaseProxy(object):
654 '''
655 A base for proxies of shared objects
656 '''
657 _address_to_local = {}
658 _mutex = util.ForkAwareThreadLock()
659
660 def __init__(self, token, serializer, manager=None,
661 authkey=None, exposed=None, incref=True):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100662 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000663 tls_idset = BaseProxy._address_to_local.get(token.address, None)
664 if tls_idset is None:
665 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
666 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000667
668 # self._tls is used to record the connection used by this
669 # thread to communicate with the manager at token.address
670 self._tls = tls_idset[0]
671
672 # self._idset is used to record the identities of all shared
673 # objects for which the current process owns references and
674 # which are in the manager at token.address
675 self._idset = tls_idset[1]
676
677 self._token = token
678 self._id = self._token.id
679 self._manager = manager
680 self._serializer = serializer
681 self._Client = listener_client[serializer][1]
682
683 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100684 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000685 elif self._manager is not None:
686 self._authkey = self._manager._authkey
687 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100688 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000689
690 if incref:
691 self._incref()
692
693 util.register_after_fork(self, BaseProxy._after_fork)
694
695 def _connect(self):
696 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100697 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000698 if threading.current_thread().name != 'MainThread':
699 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000700 conn = self._Client(self._token.address, authkey=self._authkey)
701 dispatch(conn, None, 'accept_connection', (name,))
702 self._tls.connection = conn
703
704 def _callmethod(self, methodname, args=(), kwds={}):
705 '''
706 Try to call a method of the referrent and return a copy of the result
707 '''
708 try:
709 conn = self._tls.connection
710 except AttributeError:
711 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000712 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000713 self._connect()
714 conn = self._tls.connection
715
716 conn.send((self._id, methodname, args, kwds))
717 kind, result = conn.recv()
718
719 if kind == '#RETURN':
720 return result
721 elif kind == '#PROXY':
722 exposed, token = result
723 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100724 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000725 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000726 token, self._serializer, manager=self._manager,
727 authkey=self._authkey, exposed=exposed
728 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000729 conn = self._Client(token.address, authkey=self._authkey)
730 dispatch(conn, None, 'decref', (token.id,))
731 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000732 raise convert_to_error(kind, result)
733
734 def _getvalue(self):
735 '''
736 Get a copy of the value of the referent
737 '''
738 return self._callmethod('#GETVALUE')
739
740 def _incref(self):
741 conn = self._Client(self._token.address, authkey=self._authkey)
742 dispatch(conn, None, 'incref', (self._id,))
743 util.debug('INCREF %r', self._token.id)
744
745 self._idset.add(self._id)
746
747 state = self._manager and self._manager._state
748
749 self._close = util.Finalize(
750 self, BaseProxy._decref,
751 args=(self._token, self._authkey, state,
752 self._tls, self._idset, self._Client),
753 exitpriority=10
754 )
755
756 @staticmethod
757 def _decref(token, authkey, state, tls, idset, _Client):
758 idset.discard(token.id)
759
760 # check whether manager is still alive
761 if state is None or state.value == State.STARTED:
762 # tell manager this process no longer cares about referent
763 try:
764 util.debug('DECREF %r', token.id)
765 conn = _Client(token.address, authkey=authkey)
766 dispatch(conn, None, 'decref', (token.id,))
767 except Exception as e:
768 util.debug('... decref failed %s', e)
769
770 else:
771 util.debug('DECREF %r -- manager already shutdown', token.id)
772
773 # check whether we can close this thread's connection because
774 # the process owns no more references to objects for this manager
775 if not idset and hasattr(tls, 'connection'):
776 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000777 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000778 tls.connection.close()
779 del tls.connection
780
781 def _after_fork(self):
782 self._manager = None
783 try:
784 self._incref()
785 except Exception as e:
786 # the proxy may just be for a manager which has shutdown
787 util.info('incref failed: %s' % e)
788
789 def __reduce__(self):
790 kwds = {}
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100791 if context.get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000792 kwds['authkey'] = self._authkey
793
794 if getattr(self, '_isauto', False):
795 kwds['exposed'] = self._exposed_
796 return (RebuildProxy,
797 (AutoProxy, self._token, self._serializer, kwds))
798 else:
799 return (RebuildProxy,
800 (type(self), self._token, self._serializer, kwds))
801
802 def __deepcopy__(self, memo):
803 return self._getvalue()
804
805 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300806 return '<%s object, typeid %r at %#x>' % \
807 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000808
809 def __str__(self):
810 '''
811 Return representation of the referent (or a fall-back if that fails)
812 '''
813 try:
814 return self._callmethod('__repr__')
815 except Exception:
816 return repr(self)[:-1] + "; '__str__()' failed>"
817
818#
819# Function used for unpickling
820#
821
822def RebuildProxy(func, token, serializer, kwds):
823 '''
824 Function used for unpickling proxy objects.
825
826 If possible the shared object is returned, or otherwise a proxy for it.
827 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100828 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000829
830 if server and server.address == token.address:
831 return server.id_to_obj[token.id][0]
832 else:
833 incref = (
834 kwds.pop('incref', True) and
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100835 not getattr(process.current_process(), '_inheriting', False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836 )
837 return func(token, serializer, incref=incref, **kwds)
838
839#
840# Functions to create proxies and proxy types
841#
842
843def MakeProxyType(name, exposed, _cache={}):
844 '''
845 Return an proxy type whose methods are given by `exposed`
846 '''
847 exposed = tuple(exposed)
848 try:
849 return _cache[(name, exposed)]
850 except KeyError:
851 pass
852
853 dic = {}
854
855 for meth in exposed:
856 exec('''def %s(self, *args, **kwds):
857 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
858
859 ProxyType = type(name, (BaseProxy,), dic)
860 ProxyType._exposed_ = exposed
861 _cache[(name, exposed)] = ProxyType
862 return ProxyType
863
864
865def AutoProxy(token, serializer, manager=None, authkey=None,
866 exposed=None, incref=True):
867 '''
868 Return an auto-proxy for `token`
869 '''
870 _Client = listener_client[serializer][1]
871
872 if exposed is None:
873 conn = _Client(token.address, authkey=authkey)
874 try:
875 exposed = dispatch(conn, None, 'get_methods', (token,))
876 finally:
877 conn.close()
878
879 if authkey is None and manager is not None:
880 authkey = manager._authkey
881 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100882 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883
884 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
885 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
886 incref=incref)
887 proxy._isauto = True
888 return proxy
889
890#
891# Types/callables which we will register with SyncManager
892#
893
894class Namespace(object):
895 def __init__(self, **kwds):
896 self.__dict__.update(kwds)
897 def __repr__(self):
898 items = list(self.__dict__.items())
899 temp = []
900 for name, value in items:
901 if not name.startswith('_'):
902 temp.append('%s=%r' % (name, value))
903 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300904 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905
906class Value(object):
907 def __init__(self, typecode, value, lock=True):
908 self._typecode = typecode
909 self._value = value
910 def get(self):
911 return self._value
912 def set(self, value):
913 self._value = value
914 def __repr__(self):
915 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
916 value = property(get, set)
917
918def Array(typecode, sequence, lock=True):
919 return array.array(typecode, sequence)
920
921#
922# Proxy types used by SyncManager
923#
924
925class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000926 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000927 def __iter__(self):
928 return self
929 def __next__(self, *args):
930 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000931 def send(self, *args):
932 return self._callmethod('send', args)
933 def throw(self, *args):
934 return self._callmethod('throw', args)
935 def close(self, *args):
936 return self._callmethod('close', args)
937
938
939class AcquirerProxy(BaseProxy):
940 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100941 def acquire(self, blocking=True, timeout=None):
942 args = (blocking,) if timeout is None else (blocking, timeout)
943 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000944 def release(self):
945 return self._callmethod('release')
946 def __enter__(self):
947 return self._callmethod('acquire')
948 def __exit__(self, exc_type, exc_val, exc_tb):
949 return self._callmethod('release')
950
951
952class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000953 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000954 def wait(self, timeout=None):
955 return self._callmethod('wait', (timeout,))
956 def notify(self):
957 return self._callmethod('notify')
958 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000959 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200960 def wait_for(self, predicate, timeout=None):
961 result = predicate()
962 if result:
963 return result
964 if timeout is not None:
965 endtime = _time() + timeout
966 else:
967 endtime = None
968 waittime = None
969 while not result:
970 if endtime is not None:
971 waittime = endtime - _time()
972 if waittime <= 0:
973 break
974 self.wait(waittime)
975 result = predicate()
976 return result
977
Benjamin Petersone711caf2008-06-11 16:44:04 +0000978
979class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000980 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000981 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000982 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000983 def set(self):
984 return self._callmethod('set')
985 def clear(self):
986 return self._callmethod('clear')
987 def wait(self, timeout=None):
988 return self._callmethod('wait', (timeout,))
989
Richard Oudkerk3730a172012-06-15 18:26:07 +0100990
991class BarrierProxy(BaseProxy):
992 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
993 def wait(self, timeout=None):
994 return self._callmethod('wait', (timeout,))
995 def abort(self):
996 return self._callmethod('abort')
997 def reset(self):
998 return self._callmethod('reset')
999 @property
1000 def parties(self):
1001 return self._callmethod('__getattribute__', ('parties',))
1002 @property
1003 def n_waiting(self):
1004 return self._callmethod('__getattribute__', ('n_waiting',))
1005 @property
1006 def broken(self):
1007 return self._callmethod('__getattribute__', ('broken',))
1008
1009
Benjamin Petersone711caf2008-06-11 16:44:04 +00001010class NamespaceProxy(BaseProxy):
1011 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1012 def __getattr__(self, key):
1013 if key[0] == '_':
1014 return object.__getattribute__(self, key)
1015 callmethod = object.__getattribute__(self, '_callmethod')
1016 return callmethod('__getattribute__', (key,))
1017 def __setattr__(self, key, value):
1018 if key[0] == '_':
1019 return object.__setattr__(self, key, value)
1020 callmethod = object.__getattribute__(self, '_callmethod')
1021 return callmethod('__setattr__', (key, value))
1022 def __delattr__(self, key):
1023 if key[0] == '_':
1024 return object.__delattr__(self, key)
1025 callmethod = object.__getattribute__(self, '_callmethod')
1026 return callmethod('__delattr__', (key,))
1027
1028
1029class ValueProxy(BaseProxy):
1030 _exposed_ = ('get', 'set')
1031 def get(self):
1032 return self._callmethod('get')
1033 def set(self, value):
1034 return self._callmethod('set', (value,))
1035 value = property(get, set)
1036
1037
1038BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001039 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1040 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001041 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1042 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001043 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001044class ListProxy(BaseListProxy):
1045 def __iadd__(self, value):
1046 self._callmethod('extend', (value,))
1047 return self
1048 def __imul__(self, value):
1049 self._callmethod('__imul__', (value,))
1050 return self
1051
1052
1053DictProxy = MakeProxyType('DictProxy', (
1054 '__contains__', '__delitem__', '__getitem__', '__len__',
1055 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1056 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1057 ))
1058
1059
1060ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001061 '__len__', '__getitem__', '__setitem__'
1062 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001063
1064
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001065BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001066 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001067 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001068 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001069BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001070 'apply_async': 'AsyncResult',
1071 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001072 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001073 'imap': 'Iterator',
1074 'imap_unordered': 'Iterator'
1075 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001076class PoolProxy(BasePoolProxy):
1077 def __enter__(self):
1078 return self
1079 def __exit__(self, exc_type, exc_val, exc_tb):
1080 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001081
1082#
1083# Definition of SyncManager
1084#
1085
1086class SyncManager(BaseManager):
1087 '''
1088 Subclass of `BaseManager` which supports a number of shared object types.
1089
1090 The types registered are those intended for the synchronization
1091 of threads, plus `dict`, `list` and `Namespace`.
1092
1093 The `multiprocessing.Manager()` function creates started instances of
1094 this class.
1095 '''
1096
1097SyncManager.register('Queue', queue.Queue)
1098SyncManager.register('JoinableQueue', queue.Queue)
1099SyncManager.register('Event', threading.Event, EventProxy)
1100SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1101SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1102SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1103SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1104 AcquirerProxy)
1105SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001106SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001107SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001108SyncManager.register('list', list, ListProxy)
1109SyncManager.register('dict', dict, DictProxy)
1110SyncManager.register('Value', Value, ValueProxy)
1111SyncManager.register('Array', Array, ArrayProxy)
1112SyncManager.register('Namespace', Namespace, NamespaceProxy)
1113
1114# types returned by methods of PoolProxy
1115SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1116SyncManager.register('AsyncResult', create_method=False)