blob: b9ce84b2d85ddc4d6a561b3ba36d91445a0a27f7 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
Charles-François Natalic8ce7152012-04-17 18:45:57 +020022from time import time as _time
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010023from traceback import format_exc
24
25from . import connection
Davin Potts54586472016-09-09 18:03:10 -050026from .context import reduction, get_spawning_popen
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from . import pool
28from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010029from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010030from . import get_context
Benjamin Petersone711caf2008-06-11 16:44:04 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032#
Benjamin Petersone711caf2008-06-11 16:44:04 +000033# Register some things for pickling
34#
35
36def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000037 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010038reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
40view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000041if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000042 def rebuild_as_list(obj):
43 return list, (list(obj),)
44 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010045 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
47#
48# Type for identifying shared objects
49#
50
51class Token(object):
52 '''
53 Type to uniquely indentify a shared object
54 '''
55 __slots__ = ('typeid', 'address', 'id')
56
57 def __init__(self, typeid, address, id):
58 (self.typeid, self.address, self.id) = (typeid, address, id)
59
60 def __getstate__(self):
61 return (self.typeid, self.address, self.id)
62
63 def __setstate__(self, state):
64 (self.typeid, self.address, self.id) = state
65
66 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030067 return '%s(typeid=%r, address=%r, id=%r)' % \
68 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000069
70#
71# Function for communication with a manager's server process
72#
73
74def dispatch(c, id, methodname, args=(), kwds={}):
75 '''
76 Send a message to manager using connection `c` and return response
77 '''
78 c.send((id, methodname, args, kwds))
79 kind, result = c.recv()
80 if kind == '#RETURN':
81 return result
82 raise convert_to_error(kind, result)
83
84def convert_to_error(kind, result):
85 if kind == '#ERROR':
86 return result
87 elif kind == '#TRACEBACK':
88 assert type(result) is str
89 return RemoteError(result)
90 elif kind == '#UNSERIALIZABLE':
91 assert type(result) is str
92 return RemoteError('Unserializable message: %s\n' % result)
93 else:
94 return ValueError('Unrecognized message type')
95
96class RemoteError(Exception):
97 def __str__(self):
98 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
99
100#
101# Functions for finding the method names of an object
102#
103
104def all_methods(obj):
105 '''
106 Return a list of names of methods of `obj`
107 '''
108 temp = []
109 for name in dir(obj):
110 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200111 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000112 temp.append(name)
113 return temp
114
115def public_methods(obj):
116 '''
117 Return a list of names of methods of `obj` which do not start with '_'
118 '''
119 return [name for name in all_methods(obj) if name[0] != '_']
120
121#
122# Server which is run in a process controlled by a manager
123#
124
125class Server(object):
126 '''
127 Server class which runs in a process controlled by a manager object
128 '''
129 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
130 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
131
132 def __init__(self, registry, address, authkey, serializer):
133 assert isinstance(authkey, bytes)
134 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100135 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136 Listener, Client = listener_client[serializer]
137
138 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100139 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000140 self.address = self.listener.address
141
Jesse Noller63b3a972009-01-21 02:15:48 +0000142 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500144 self.id_to_local_proxy_obj = {}
145 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146
147 def serve_forever(self):
148 '''
149 Run the server forever
150 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100151 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100152 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100154 accepter = threading.Thread(target=self.accepter)
155 accepter.daemon = True
156 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000157 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100158 while not self.stop_event.is_set():
159 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160 except (KeyboardInterrupt, SystemExit):
161 pass
162 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100163 if sys.stdout != sys.__stdout__:
164 util.debug('resetting stdout, stderr')
165 sys.stdout = sys.__stdout__
166 sys.stderr = sys.__stderr__
167 sys.exit(0)
168
169 def accepter(self):
170 while True:
171 try:
172 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200173 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100174 continue
175 t = threading.Thread(target=self.handle_request, args=(c,))
176 t.daemon = True
177 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178
179 def handle_request(self, c):
180 '''
181 Handle a new connection
182 '''
183 funcname = result = request = None
184 try:
185 connection.deliver_challenge(c, self.authkey)
186 connection.answer_challenge(c, self.authkey)
187 request = c.recv()
188 ignore, funcname, args, kwds = request
189 assert funcname in self.public, '%r unrecognized' % funcname
190 func = getattr(self, funcname)
191 except Exception:
192 msg = ('#TRACEBACK', format_exc())
193 else:
194 try:
195 result = func(c, *args, **kwds)
196 except Exception:
197 msg = ('#TRACEBACK', format_exc())
198 else:
199 msg = ('#RETURN', result)
200 try:
201 c.send(msg)
202 except Exception as e:
203 try:
204 c.send(('#TRACEBACK', format_exc()))
205 except Exception:
206 pass
207 util.info('Failure to send message: %r', msg)
208 util.info(' ... request was %r', request)
209 util.info(' ... exception was %r', e)
210
211 c.close()
212
213 def serve_client(self, conn):
214 '''
215 Handle requests from the proxies in a particular process/thread
216 '''
217 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000218 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219
220 recv = conn.recv
221 send = conn.send
222 id_to_obj = self.id_to_obj
223
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100224 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 try:
227 methodname = obj = None
228 request = recv()
229 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500230 try:
231 obj, exposed, gettypeid = id_to_obj[ident]
232 except KeyError as ke:
233 try:
234 obj, exposed, gettypeid = \
235 self.id_to_local_proxy_obj[ident]
236 except KeyError as second_ke:
237 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238
239 if methodname not in exposed:
240 raise AttributeError(
241 'method %r of %r object is not in exposed=%r' %
242 (methodname, type(obj), exposed)
243 )
244
245 function = getattr(obj, methodname)
246
247 try:
248 res = function(*args, **kwds)
249 except Exception as e:
250 msg = ('#ERROR', e)
251 else:
252 typeid = gettypeid and gettypeid.get(methodname, None)
253 if typeid:
254 rident, rexposed = self.create(conn, typeid, res)
255 token = Token(typeid, self.address, rident)
256 msg = ('#PROXY', (rexposed, token))
257 else:
258 msg = ('#RETURN', res)
259
260 except AttributeError:
261 if methodname is None:
262 msg = ('#TRACEBACK', format_exc())
263 else:
264 try:
265 fallback_func = self.fallback_mapping[methodname]
266 result = fallback_func(
267 self, conn, ident, obj, *args, **kwds
268 )
269 msg = ('#RETURN', result)
270 except Exception:
271 msg = ('#TRACEBACK', format_exc())
272
273 except EOFError:
274 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000275 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276 sys.exit(0)
277
278 except Exception:
279 msg = ('#TRACEBACK', format_exc())
280
281 try:
282 try:
283 send(msg)
284 except Exception as e:
Davin Potts37156a72016-09-08 14:40:36 -0500285 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286 except Exception as e:
287 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000288 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000289 util.info(' ... message was %r', msg)
290 util.info(' ... exception was %r', e)
291 conn.close()
292 sys.exit(1)
293
294 def fallback_getvalue(self, conn, ident, obj):
295 return obj
296
297 def fallback_str(self, conn, ident, obj):
298 return str(obj)
299
300 def fallback_repr(self, conn, ident, obj):
301 return repr(obj)
302
303 fallback_mapping = {
304 '__str__':fallback_str,
305 '__repr__':fallback_repr,
306 '#GETVALUE':fallback_getvalue
307 }
308
309 def dummy(self, c):
310 pass
311
312 def debug_info(self, c):
313 '''
314 Return some info --- useful to spot problems with refcounting
315 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100316 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000317 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500318 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000319 keys.sort()
320 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000321 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322 result.append(' %s: refcount=%s\n %s' %
323 (ident, self.id_to_refcount[ident],
324 str(self.id_to_obj[ident][0])[:75]))
325 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000326
327 def number_of_objects(self, c):
328 '''
329 Number of shared objects
330 '''
Davin Potts86a76682016-09-07 18:48:01 -0500331 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
332 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333
334 def shutdown(self, c):
335 '''
336 Shutdown this process
337 '''
338 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100339 util.debug('manager received shutdown message')
340 c.send(('#RETURN', None))
341 except:
342 import traceback
343 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000344 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100345 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000346
347 def create(self, c, typeid, *args, **kwds):
348 '''
349 Create a new shared object and return its id
350 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100351 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000352 callable, exposed, method_to_typeid, proxytype = \
353 self.registry[typeid]
354
355 if callable is None:
356 assert len(args) == 1 and not kwds
357 obj = args[0]
358 else:
359 obj = callable(*args, **kwds)
360
361 if exposed is None:
362 exposed = public_methods(obj)
363 if method_to_typeid is not None:
364 assert type(method_to_typeid) is dict
365 exposed = list(exposed) + list(method_to_typeid)
366
367 ident = '%x' % id(obj) # convert to string because xmlrpclib
368 # only has 32 bit signed integers
369 util.debug('%r callable returned object with id %r', typeid, ident)
370
371 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
372 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000373 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500374
375 self.incref(c, ident)
376 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000377
378 def get_methods(self, c, token):
379 '''
380 Return the methods of the shared object indicated by token
381 '''
382 return tuple(self.id_to_obj[token.id][1])
383
384 def accept_connection(self, c, name):
385 '''
386 Spawn a new thread to serve this connection
387 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000388 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000389 c.send(('#RETURN', None))
390 self.serve_client(c)
391
392 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100393 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500394 try:
395 self.id_to_refcount[ident] += 1
396 except KeyError as ke:
397 # If no external references exist but an internal (to the
398 # manager) still does and a new external reference is created
399 # from it, restore the manager's tracking of it from the
400 # previously stashed internal ref.
401 if ident in self.id_to_local_proxy_obj:
402 self.id_to_refcount[ident] = 1
403 self.id_to_obj[ident] = \
404 self.id_to_local_proxy_obj[ident]
405 obj, exposed, gettypeid = self.id_to_obj[ident]
406 util.debug('Server re-enabled tracking & INCREF %r', ident)
407 else:
408 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409
410 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500411 if ident not in self.id_to_refcount and \
412 ident in self.id_to_local_proxy_obj:
413 util.debug('Server DECREF skipping %r', ident)
414 return
415
Charles-François Natalia924fc72014-05-25 14:12:12 +0100416 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000417 assert self.id_to_refcount[ident] >= 1
418 self.id_to_refcount[ident] -= 1
419 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500420 del self.id_to_refcount[ident]
421
422 if ident not in self.id_to_refcount:
423 # Two-step process in case the object turns out to contain other
424 # proxy objects (e.g. a managed list of managed lists).
425 # Otherwise, deleting self.id_to_obj[ident] would trigger the
426 # deleting of the stored value (another managed object) which would
427 # in turn attempt to acquire the mutex that is already held here.
428 self.id_to_obj[ident] = (None, (), None) # thread-safe
429 util.debug('disposing of obj with id %r', ident)
430 with self.mutex:
431 del self.id_to_obj[ident]
432
Benjamin Petersone711caf2008-06-11 16:44:04 +0000433
434#
435# Class to represent state of a manager
436#
437
438class State(object):
439 __slots__ = ['value']
440 INITIAL = 0
441 STARTED = 1
442 SHUTDOWN = 2
443
444#
445# Mapping from serializer name to Listener and Client types
446#
447
448listener_client = {
449 'pickle' : (connection.Listener, connection.Client),
450 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
451 }
452
453#
454# Definition of BaseManager
455#
456
457class BaseManager(object):
458 '''
459 Base class for managers
460 '''
461 _registry = {}
462 _Server = Server
463
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100464 def __init__(self, address=None, authkey=None, serializer='pickle',
465 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000466 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100467 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000468 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100469 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000470 self._state = State()
471 self._state.value = State.INITIAL
472 self._serializer = serializer
473 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100474 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475
Benjamin Petersone711caf2008-06-11 16:44:04 +0000476 def get_server(self):
477 '''
478 Return server object with serve_forever() method and address attribute
479 '''
480 assert self._state.value == State.INITIAL
481 return Server(self._registry, self._address,
482 self._authkey, self._serializer)
483
484 def connect(self):
485 '''
486 Connect manager object to the server process
487 '''
488 Listener, Client = listener_client[self._serializer]
489 conn = Client(self._address, authkey=self._authkey)
490 dispatch(conn, None, 'dummy')
491 self._state.value = State.STARTED
492
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000493 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000494 '''
495 Spawn a server process for this manager object
496 '''
497 assert self._state.value == State.INITIAL
498
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200499 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000500 raise TypeError('initializer must be a callable')
501
Benjamin Petersone711caf2008-06-11 16:44:04 +0000502 # pipe over which we will retrieve address of server
503 reader, writer = connection.Pipe(duplex=False)
504
505 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100506 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000507 target=type(self)._run_server,
508 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000509 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510 )
511 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000512 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 self._process.start()
514
515 # get address of server
516 writer.close()
517 self._address = reader.recv()
518 reader.close()
519
520 # register a finalizer
521 self._state.value = State.STARTED
522 self.shutdown = util.Finalize(
523 self, type(self)._finalize_manager,
524 args=(self._process, self._address, self._authkey,
525 self._state, self._Client),
526 exitpriority=0
527 )
528
529 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000530 def _run_server(cls, registry, address, authkey, serializer, writer,
531 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000532 '''
533 Create a server, report its address and run it
534 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000535 if initializer is not None:
536 initializer(*initargs)
537
Benjamin Petersone711caf2008-06-11 16:44:04 +0000538 # create server
539 server = cls._Server(registry, address, authkey, serializer)
540
541 # inform parent process of the server's address
542 writer.send(server.address)
543 writer.close()
544
545 # run the manager
546 util.info('manager serving at %r', server.address)
547 server.serve_forever()
548
549 def _create(self, typeid, *args, **kwds):
550 '''
551 Create a new shared object; return the token and exposed tuple
552 '''
553 assert self._state.value == State.STARTED, 'server not yet started'
554 conn = self._Client(self._address, authkey=self._authkey)
555 try:
556 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
557 finally:
558 conn.close()
559 return Token(typeid, self._address, id), exposed
560
561 def join(self, timeout=None):
562 '''
563 Join the manager process (if it has been spawned)
564 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100565 if self._process is not None:
566 self._process.join(timeout)
567 if not self._process.is_alive():
568 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000569
570 def _debug_info(self):
571 '''
572 Return some info about the servers shared objects and connections
573 '''
574 conn = self._Client(self._address, authkey=self._authkey)
575 try:
576 return dispatch(conn, None, 'debug_info')
577 finally:
578 conn.close()
579
580 def _number_of_objects(self):
581 '''
582 Return the number of shared objects
583 '''
584 conn = self._Client(self._address, authkey=self._authkey)
585 try:
586 return dispatch(conn, None, 'number_of_objects')
587 finally:
588 conn.close()
589
590 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100591 if self._state.value == State.INITIAL:
592 self.start()
593 assert self._state.value == State.STARTED
Benjamin Petersone711caf2008-06-11 16:44:04 +0000594 return self
595
596 def __exit__(self, exc_type, exc_val, exc_tb):
597 self.shutdown()
598
599 @staticmethod
600 def _finalize_manager(process, address, authkey, state, _Client):
601 '''
602 Shutdown the manager process; will be registered as a finalizer
603 '''
604 if process.is_alive():
605 util.info('sending shutdown message to manager')
606 try:
607 conn = _Client(address, authkey=authkey)
608 try:
609 dispatch(conn, None, 'shutdown')
610 finally:
611 conn.close()
612 except Exception:
613 pass
614
Richard Oudkerk3049f122012-06-15 20:08:29 +0100615 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000616 if process.is_alive():
617 util.info('manager still alive')
618 if hasattr(process, 'terminate'):
619 util.info('trying to `terminate()` manager process')
620 process.terminate()
621 process.join(timeout=0.1)
622 if process.is_alive():
623 util.info('manager still alive after terminate')
624
625 state.value = State.SHUTDOWN
626 try:
627 del BaseProxy._address_to_local[address]
628 except KeyError:
629 pass
630
631 address = property(lambda self: self._address)
632
633 @classmethod
634 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
635 method_to_typeid=None, create_method=True):
636 '''
637 Register a typeid with the manager type
638 '''
639 if '_registry' not in cls.__dict__:
640 cls._registry = cls._registry.copy()
641
642 if proxytype is None:
643 proxytype = AutoProxy
644
645 exposed = exposed or getattr(proxytype, '_exposed_', None)
646
647 method_to_typeid = method_to_typeid or \
648 getattr(proxytype, '_method_to_typeid_', None)
649
650 if method_to_typeid:
651 for key, value in list(method_to_typeid.items()):
652 assert type(key) is str, '%r is not a string' % key
653 assert type(value) is str, '%r is not a string' % value
654
655 cls._registry[typeid] = (
656 callable, exposed, method_to_typeid, proxytype
657 )
658
659 if create_method:
660 def temp(self, *args, **kwds):
661 util.debug('requesting creation of a shared %r object', typeid)
662 token, exp = self._create(typeid, *args, **kwds)
663 proxy = proxytype(
664 token, self._serializer, manager=self,
665 authkey=self._authkey, exposed=exp
666 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000667 conn = self._Client(token.address, authkey=self._authkey)
668 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000669 return proxy
670 temp.__name__ = typeid
671 setattr(cls, typeid, temp)
672
673#
674# Subclass of set which get cleared after a fork
675#
676
677class ProcessLocalSet(set):
678 def __init__(self):
679 util.register_after_fork(self, lambda obj: obj.clear())
680 def __reduce__(self):
681 return type(self), ()
682
683#
684# Definition of BaseProxy
685#
686
687class BaseProxy(object):
688 '''
689 A base for proxies of shared objects
690 '''
691 _address_to_local = {}
692 _mutex = util.ForkAwareThreadLock()
693
694 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500695 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100696 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000697 tls_idset = BaseProxy._address_to_local.get(token.address, None)
698 if tls_idset is None:
699 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
700 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000701
702 # self._tls is used to record the connection used by this
703 # thread to communicate with the manager at token.address
704 self._tls = tls_idset[0]
705
706 # self._idset is used to record the identities of all shared
707 # objects for which the current process owns references and
708 # which are in the manager at token.address
709 self._idset = tls_idset[1]
710
711 self._token = token
712 self._id = self._token.id
713 self._manager = manager
714 self._serializer = serializer
715 self._Client = listener_client[serializer][1]
716
Davin Potts86a76682016-09-07 18:48:01 -0500717 # Should be set to True only when a proxy object is being created
718 # on the manager server; primary use case: nested proxy objects.
719 # RebuildProxy detects when a proxy is being created on the manager
720 # and sets this value appropriately.
721 self._owned_by_manager = manager_owned
722
Benjamin Petersone711caf2008-06-11 16:44:04 +0000723 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100724 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000725 elif self._manager is not None:
726 self._authkey = self._manager._authkey
727 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100728 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000729
730 if incref:
731 self._incref()
732
733 util.register_after_fork(self, BaseProxy._after_fork)
734
735 def _connect(self):
736 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100737 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000738 if threading.current_thread().name != 'MainThread':
739 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 conn = self._Client(self._token.address, authkey=self._authkey)
741 dispatch(conn, None, 'accept_connection', (name,))
742 self._tls.connection = conn
743
744 def _callmethod(self, methodname, args=(), kwds={}):
745 '''
746 Try to call a method of the referrent and return a copy of the result
747 '''
748 try:
749 conn = self._tls.connection
750 except AttributeError:
751 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000752 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000753 self._connect()
754 conn = self._tls.connection
755
756 conn.send((self._id, methodname, args, kwds))
757 kind, result = conn.recv()
758
759 if kind == '#RETURN':
760 return result
761 elif kind == '#PROXY':
762 exposed, token = result
763 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100764 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000765 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000766 token, self._serializer, manager=self._manager,
767 authkey=self._authkey, exposed=exposed
768 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000769 conn = self._Client(token.address, authkey=self._authkey)
770 dispatch(conn, None, 'decref', (token.id,))
771 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000772 raise convert_to_error(kind, result)
773
774 def _getvalue(self):
775 '''
776 Get a copy of the value of the referent
777 '''
778 return self._callmethod('#GETVALUE')
779
780 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500781 if self._owned_by_manager:
782 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
783 return
784
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 conn = self._Client(self._token.address, authkey=self._authkey)
786 dispatch(conn, None, 'incref', (self._id,))
787 util.debug('INCREF %r', self._token.id)
788
789 self._idset.add(self._id)
790
791 state = self._manager and self._manager._state
792
793 self._close = util.Finalize(
794 self, BaseProxy._decref,
795 args=(self._token, self._authkey, state,
796 self._tls, self._idset, self._Client),
797 exitpriority=10
798 )
799
800 @staticmethod
801 def _decref(token, authkey, state, tls, idset, _Client):
802 idset.discard(token.id)
803
804 # check whether manager is still alive
805 if state is None or state.value == State.STARTED:
806 # tell manager this process no longer cares about referent
807 try:
808 util.debug('DECREF %r', token.id)
809 conn = _Client(token.address, authkey=authkey)
810 dispatch(conn, None, 'decref', (token.id,))
811 except Exception as e:
812 util.debug('... decref failed %s', e)
813
814 else:
815 util.debug('DECREF %r -- manager already shutdown', token.id)
816
817 # check whether we can close this thread's connection because
818 # the process owns no more references to objects for this manager
819 if not idset and hasattr(tls, 'connection'):
820 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000821 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000822 tls.connection.close()
823 del tls.connection
824
825 def _after_fork(self):
826 self._manager = None
827 try:
828 self._incref()
829 except Exception as e:
830 # the proxy may just be for a manager which has shutdown
831 util.info('incref failed: %s' % e)
832
833 def __reduce__(self):
834 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500835 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836 kwds['authkey'] = self._authkey
837
838 if getattr(self, '_isauto', False):
839 kwds['exposed'] = self._exposed_
840 return (RebuildProxy,
841 (AutoProxy, self._token, self._serializer, kwds))
842 else:
843 return (RebuildProxy,
844 (type(self), self._token, self._serializer, kwds))
845
846 def __deepcopy__(self, memo):
847 return self._getvalue()
848
849 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300850 return '<%s object, typeid %r at %#x>' % \
851 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000852
853 def __str__(self):
854 '''
855 Return representation of the referent (or a fall-back if that fails)
856 '''
857 try:
858 return self._callmethod('__repr__')
859 except Exception:
860 return repr(self)[:-1] + "; '__str__()' failed>"
861
862#
863# Function used for unpickling
864#
865
866def RebuildProxy(func, token, serializer, kwds):
867 '''
868 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100870 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500872 util.debug('Rebuild a proxy owned by manager, token=%r', token)
873 kwds['manager_owned'] = True
874 if token.id not in server.id_to_local_proxy_obj:
875 server.id_to_local_proxy_obj[token.id] = \
876 server.id_to_obj[token.id]
877 incref = (
878 kwds.pop('incref', True) and
879 not getattr(process.current_process(), '_inheriting', False)
880 )
881 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000882
883#
884# Functions to create proxies and proxy types
885#
886
887def MakeProxyType(name, exposed, _cache={}):
888 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300889 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000890 '''
891 exposed = tuple(exposed)
892 try:
893 return _cache[(name, exposed)]
894 except KeyError:
895 pass
896
897 dic = {}
898
899 for meth in exposed:
900 exec('''def %s(self, *args, **kwds):
901 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
902
903 ProxyType = type(name, (BaseProxy,), dic)
904 ProxyType._exposed_ = exposed
905 _cache[(name, exposed)] = ProxyType
906 return ProxyType
907
908
909def AutoProxy(token, serializer, manager=None, authkey=None,
910 exposed=None, incref=True):
911 '''
912 Return an auto-proxy for `token`
913 '''
914 _Client = listener_client[serializer][1]
915
916 if exposed is None:
917 conn = _Client(token.address, authkey=authkey)
918 try:
919 exposed = dispatch(conn, None, 'get_methods', (token,))
920 finally:
921 conn.close()
922
923 if authkey is None and manager is not None:
924 authkey = manager._authkey
925 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100926 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000927
928 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
929 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
930 incref=incref)
931 proxy._isauto = True
932 return proxy
933
934#
935# Types/callables which we will register with SyncManager
936#
937
938class Namespace(object):
939 def __init__(self, **kwds):
940 self.__dict__.update(kwds)
941 def __repr__(self):
942 items = list(self.__dict__.items())
943 temp = []
944 for name, value in items:
945 if not name.startswith('_'):
946 temp.append('%s=%r' % (name, value))
947 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300948 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000949
950class Value(object):
951 def __init__(self, typecode, value, lock=True):
952 self._typecode = typecode
953 self._value = value
954 def get(self):
955 return self._value
956 def set(self, value):
957 self._value = value
958 def __repr__(self):
959 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
960 value = property(get, set)
961
962def Array(typecode, sequence, lock=True):
963 return array.array(typecode, sequence)
964
965#
966# Proxy types used by SyncManager
967#
968
969class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000970 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 def __iter__(self):
972 return self
973 def __next__(self, *args):
974 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000975 def send(self, *args):
976 return self._callmethod('send', args)
977 def throw(self, *args):
978 return self._callmethod('throw', args)
979 def close(self, *args):
980 return self._callmethod('close', args)
981
982
983class AcquirerProxy(BaseProxy):
984 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100985 def acquire(self, blocking=True, timeout=None):
986 args = (blocking,) if timeout is None else (blocking, timeout)
987 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000988 def release(self):
989 return self._callmethod('release')
990 def __enter__(self):
991 return self._callmethod('acquire')
992 def __exit__(self, exc_type, exc_val, exc_tb):
993 return self._callmethod('release')
994
995
996class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000997 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000998 def wait(self, timeout=None):
999 return self._callmethod('wait', (timeout,))
1000 def notify(self):
1001 return self._callmethod('notify')
1002 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001003 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001004 def wait_for(self, predicate, timeout=None):
1005 result = predicate()
1006 if result:
1007 return result
1008 if timeout is not None:
1009 endtime = _time() + timeout
1010 else:
1011 endtime = None
1012 waittime = None
1013 while not result:
1014 if endtime is not None:
1015 waittime = endtime - _time()
1016 if waittime <= 0:
1017 break
1018 self.wait(waittime)
1019 result = predicate()
1020 return result
1021
Benjamin Petersone711caf2008-06-11 16:44:04 +00001022
1023class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001024 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001025 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001026 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001027 def set(self):
1028 return self._callmethod('set')
1029 def clear(self):
1030 return self._callmethod('clear')
1031 def wait(self, timeout=None):
1032 return self._callmethod('wait', (timeout,))
1033
Richard Oudkerk3730a172012-06-15 18:26:07 +01001034
1035class BarrierProxy(BaseProxy):
1036 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1037 def wait(self, timeout=None):
1038 return self._callmethod('wait', (timeout,))
1039 def abort(self):
1040 return self._callmethod('abort')
1041 def reset(self):
1042 return self._callmethod('reset')
1043 @property
1044 def parties(self):
1045 return self._callmethod('__getattribute__', ('parties',))
1046 @property
1047 def n_waiting(self):
1048 return self._callmethod('__getattribute__', ('n_waiting',))
1049 @property
1050 def broken(self):
1051 return self._callmethod('__getattribute__', ('broken',))
1052
1053
Benjamin Petersone711caf2008-06-11 16:44:04 +00001054class NamespaceProxy(BaseProxy):
1055 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1056 def __getattr__(self, key):
1057 if key[0] == '_':
1058 return object.__getattribute__(self, key)
1059 callmethod = object.__getattribute__(self, '_callmethod')
1060 return callmethod('__getattribute__', (key,))
1061 def __setattr__(self, key, value):
1062 if key[0] == '_':
1063 return object.__setattr__(self, key, value)
1064 callmethod = object.__getattribute__(self, '_callmethod')
1065 return callmethod('__setattr__', (key, value))
1066 def __delattr__(self, key):
1067 if key[0] == '_':
1068 return object.__delattr__(self, key)
1069 callmethod = object.__getattribute__(self, '_callmethod')
1070 return callmethod('__delattr__', (key,))
1071
1072
1073class ValueProxy(BaseProxy):
1074 _exposed_ = ('get', 'set')
1075 def get(self):
1076 return self._callmethod('get')
1077 def set(self, value):
1078 return self._callmethod('set', (value,))
1079 value = property(get, set)
1080
1081
1082BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001083 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1084 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1086 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001087 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088class ListProxy(BaseListProxy):
1089 def __iadd__(self, value):
1090 self._callmethod('extend', (value,))
1091 return self
1092 def __imul__(self, value):
1093 self._callmethod('__imul__', (value,))
1094 return self
1095
1096
1097DictProxy = MakeProxyType('DictProxy', (
1098 '__contains__', '__delitem__', '__getitem__', '__len__',
1099 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1100 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1101 ))
1102
1103
1104ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001105 '__len__', '__getitem__', '__setitem__'
1106 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001107
1108
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001109BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001110 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001111 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001112 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001113BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001114 'apply_async': 'AsyncResult',
1115 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001116 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001117 'imap': 'Iterator',
1118 'imap_unordered': 'Iterator'
1119 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001120class PoolProxy(BasePoolProxy):
1121 def __enter__(self):
1122 return self
1123 def __exit__(self, exc_type, exc_val, exc_tb):
1124 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001125
1126#
1127# Definition of SyncManager
1128#
1129
1130class SyncManager(BaseManager):
1131 '''
1132 Subclass of `BaseManager` which supports a number of shared object types.
1133
1134 The types registered are those intended for the synchronization
1135 of threads, plus `dict`, `list` and `Namespace`.
1136
1137 The `multiprocessing.Manager()` function creates started instances of
1138 this class.
1139 '''
1140
1141SyncManager.register('Queue', queue.Queue)
1142SyncManager.register('JoinableQueue', queue.Queue)
1143SyncManager.register('Event', threading.Event, EventProxy)
1144SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1145SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1146SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1147SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1148 AcquirerProxy)
1149SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001150SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001151SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001152SyncManager.register('list', list, ListProxy)
1153SyncManager.register('dict', dict, DictProxy)
1154SyncManager.register('Value', Value, ValueProxy)
1155SyncManager.register('Array', Array, ArrayProxy)
1156SyncManager.register('Namespace', Namespace, NamespaceProxy)
1157
1158# types returned by methods of PoolProxy
1159SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1160SyncManager.register('AsyncResult', create_method=False)