blob: 43dd02a5a391c12c8d239355ff009b79f92691f6 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
Charles-François Natalic8ce7152012-04-17 18:45:57 +020022from time import time as _time
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010023from traceback import format_exc
24
25from . import connection
Davin Potts54586472016-09-09 18:03:10 -050026from .context import reduction, get_spawning_popen
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from . import pool
28from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010029from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010030from . import get_context
Benjamin Petersone711caf2008-06-11 16:44:04 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032#
Benjamin Petersone711caf2008-06-11 16:44:04 +000033# Register some things for pickling
34#
35
36def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000037 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010038reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
40view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000041if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000042 def rebuild_as_list(obj):
43 return list, (list(obj),)
44 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010045 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
47#
48# Type for identifying shared objects
49#
50
51class Token(object):
52 '''
53 Type to uniquely indentify a shared object
54 '''
55 __slots__ = ('typeid', 'address', 'id')
56
57 def __init__(self, typeid, address, id):
58 (self.typeid, self.address, self.id) = (typeid, address, id)
59
60 def __getstate__(self):
61 return (self.typeid, self.address, self.id)
62
63 def __setstate__(self, state):
64 (self.typeid, self.address, self.id) = state
65
66 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030067 return '%s(typeid=%r, address=%r, id=%r)' % \
68 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000069
70#
71# Function for communication with a manager's server process
72#
73
74def dispatch(c, id, methodname, args=(), kwds={}):
75 '''
76 Send a message to manager using connection `c` and return response
77 '''
78 c.send((id, methodname, args, kwds))
79 kind, result = c.recv()
80 if kind == '#RETURN':
81 return result
82 raise convert_to_error(kind, result)
83
84def convert_to_error(kind, result):
85 if kind == '#ERROR':
86 return result
87 elif kind == '#TRACEBACK':
88 assert type(result) is str
89 return RemoteError(result)
90 elif kind == '#UNSERIALIZABLE':
91 assert type(result) is str
92 return RemoteError('Unserializable message: %s\n' % result)
93 else:
94 return ValueError('Unrecognized message type')
95
96class RemoteError(Exception):
97 def __str__(self):
98 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
99
100#
101# Functions for finding the method names of an object
102#
103
104def all_methods(obj):
105 '''
106 Return a list of names of methods of `obj`
107 '''
108 temp = []
109 for name in dir(obj):
110 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200111 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000112 temp.append(name)
113 return temp
114
115def public_methods(obj):
116 '''
117 Return a list of names of methods of `obj` which do not start with '_'
118 '''
119 return [name for name in all_methods(obj) if name[0] != '_']
120
121#
122# Server which is run in a process controlled by a manager
123#
124
125class Server(object):
126 '''
127 Server class which runs in a process controlled by a manager object
128 '''
129 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
130 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
131
132 def __init__(self, registry, address, authkey, serializer):
133 assert isinstance(authkey, bytes)
134 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100135 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136 Listener, Client = listener_client[serializer]
137
138 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100139 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000140 self.address = self.listener.address
141
Jesse Noller63b3a972009-01-21 02:15:48 +0000142 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500144 self.id_to_local_proxy_obj = {}
145 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146
147 def serve_forever(self):
148 '''
149 Run the server forever
150 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100151 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100152 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100154 accepter = threading.Thread(target=self.accepter)
155 accepter.daemon = True
156 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000157 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100158 while not self.stop_event.is_set():
159 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160 except (KeyboardInterrupt, SystemExit):
161 pass
162 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100163 if sys.stdout != sys.__stdout__:
164 util.debug('resetting stdout, stderr')
165 sys.stdout = sys.__stdout__
166 sys.stderr = sys.__stderr__
167 sys.exit(0)
168
169 def accepter(self):
170 while True:
171 try:
172 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200173 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100174 continue
175 t = threading.Thread(target=self.handle_request, args=(c,))
176 t.daemon = True
177 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178
179 def handle_request(self, c):
180 '''
181 Handle a new connection
182 '''
183 funcname = result = request = None
184 try:
185 connection.deliver_challenge(c, self.authkey)
186 connection.answer_challenge(c, self.authkey)
187 request = c.recv()
188 ignore, funcname, args, kwds = request
189 assert funcname in self.public, '%r unrecognized' % funcname
190 func = getattr(self, funcname)
191 except Exception:
192 msg = ('#TRACEBACK', format_exc())
193 else:
194 try:
195 result = func(c, *args, **kwds)
196 except Exception:
197 msg = ('#TRACEBACK', format_exc())
198 else:
199 msg = ('#RETURN', result)
200 try:
201 c.send(msg)
202 except Exception as e:
203 try:
204 c.send(('#TRACEBACK', format_exc()))
205 except Exception:
206 pass
207 util.info('Failure to send message: %r', msg)
208 util.info(' ... request was %r', request)
209 util.info(' ... exception was %r', e)
210
211 c.close()
212
213 def serve_client(self, conn):
214 '''
215 Handle requests from the proxies in a particular process/thread
216 '''
217 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000218 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219
220 recv = conn.recv
221 send = conn.send
222 id_to_obj = self.id_to_obj
223
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100224 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 try:
227 methodname = obj = None
228 request = recv()
229 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500230 try:
231 obj, exposed, gettypeid = id_to_obj[ident]
232 except KeyError as ke:
233 try:
234 obj, exposed, gettypeid = \
235 self.id_to_local_proxy_obj[ident]
236 except KeyError as second_ke:
237 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238
239 if methodname not in exposed:
240 raise AttributeError(
241 'method %r of %r object is not in exposed=%r' %
242 (methodname, type(obj), exposed)
243 )
244
245 function = getattr(obj, methodname)
246
247 try:
248 res = function(*args, **kwds)
249 except Exception as e:
250 msg = ('#ERROR', e)
251 else:
252 typeid = gettypeid and gettypeid.get(methodname, None)
253 if typeid:
254 rident, rexposed = self.create(conn, typeid, res)
255 token = Token(typeid, self.address, rident)
256 msg = ('#PROXY', (rexposed, token))
257 else:
258 msg = ('#RETURN', res)
259
260 except AttributeError:
261 if methodname is None:
262 msg = ('#TRACEBACK', format_exc())
263 else:
264 try:
265 fallback_func = self.fallback_mapping[methodname]
266 result = fallback_func(
267 self, conn, ident, obj, *args, **kwds
268 )
269 msg = ('#RETURN', result)
270 except Exception:
271 msg = ('#TRACEBACK', format_exc())
272
273 except EOFError:
274 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000275 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276 sys.exit(0)
277
278 except Exception:
279 msg = ('#TRACEBACK', format_exc())
280
281 try:
282 try:
283 send(msg)
284 except Exception as e:
Davin Potts37156a72016-09-08 14:40:36 -0500285 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286 except Exception as e:
287 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000288 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000289 util.info(' ... message was %r', msg)
290 util.info(' ... exception was %r', e)
291 conn.close()
292 sys.exit(1)
293
294 def fallback_getvalue(self, conn, ident, obj):
295 return obj
296
297 def fallback_str(self, conn, ident, obj):
298 return str(obj)
299
300 def fallback_repr(self, conn, ident, obj):
301 return repr(obj)
302
303 fallback_mapping = {
304 '__str__':fallback_str,
305 '__repr__':fallback_repr,
306 '#GETVALUE':fallback_getvalue
307 }
308
309 def dummy(self, c):
310 pass
311
312 def debug_info(self, c):
313 '''
314 Return some info --- useful to spot problems with refcounting
315 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100316 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000317 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500318 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000319 keys.sort()
320 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000321 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322 result.append(' %s: refcount=%s\n %s' %
323 (ident, self.id_to_refcount[ident],
324 str(self.id_to_obj[ident][0])[:75]))
325 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000326
327 def number_of_objects(self, c):
328 '''
329 Number of shared objects
330 '''
Davin Potts86a76682016-09-07 18:48:01 -0500331 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
332 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333
334 def shutdown(self, c):
335 '''
336 Shutdown this process
337 '''
338 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100339 util.debug('manager received shutdown message')
340 c.send(('#RETURN', None))
341 except:
342 import traceback
343 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000344 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100345 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000346
347 def create(self, c, typeid, *args, **kwds):
348 '''
349 Create a new shared object and return its id
350 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100351 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000352 callable, exposed, method_to_typeid, proxytype = \
353 self.registry[typeid]
354
355 if callable is None:
356 assert len(args) == 1 and not kwds
357 obj = args[0]
358 else:
359 obj = callable(*args, **kwds)
360
361 if exposed is None:
362 exposed = public_methods(obj)
363 if method_to_typeid is not None:
364 assert type(method_to_typeid) is dict
365 exposed = list(exposed) + list(method_to_typeid)
366
367 ident = '%x' % id(obj) # convert to string because xmlrpclib
368 # only has 32 bit signed integers
369 util.debug('%r callable returned object with id %r', typeid, ident)
370
371 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
372 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000373 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500374
375 self.incref(c, ident)
376 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000377
378 def get_methods(self, c, token):
379 '''
380 Return the methods of the shared object indicated by token
381 '''
382 return tuple(self.id_to_obj[token.id][1])
383
384 def accept_connection(self, c, name):
385 '''
386 Spawn a new thread to serve this connection
387 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000388 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000389 c.send(('#RETURN', None))
390 self.serve_client(c)
391
392 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100393 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500394 try:
395 self.id_to_refcount[ident] += 1
396 except KeyError as ke:
397 # If no external references exist but an internal (to the
398 # manager) still does and a new external reference is created
399 # from it, restore the manager's tracking of it from the
400 # previously stashed internal ref.
401 if ident in self.id_to_local_proxy_obj:
402 self.id_to_refcount[ident] = 1
403 self.id_to_obj[ident] = \
404 self.id_to_local_proxy_obj[ident]
405 obj, exposed, gettypeid = self.id_to_obj[ident]
406 util.debug('Server re-enabled tracking & INCREF %r', ident)
407 else:
408 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409
410 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500411 if ident not in self.id_to_refcount and \
412 ident in self.id_to_local_proxy_obj:
413 util.debug('Server DECREF skipping %r', ident)
414 return
415
Charles-François Natalia924fc72014-05-25 14:12:12 +0100416 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000417 assert self.id_to_refcount[ident] >= 1
418 self.id_to_refcount[ident] -= 1
419 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500420 del self.id_to_refcount[ident]
421
422 if ident not in self.id_to_refcount:
423 # Two-step process in case the object turns out to contain other
424 # proxy objects (e.g. a managed list of managed lists).
425 # Otherwise, deleting self.id_to_obj[ident] would trigger the
426 # deleting of the stored value (another managed object) which would
427 # in turn attempt to acquire the mutex that is already held here.
428 self.id_to_obj[ident] = (None, (), None) # thread-safe
429 util.debug('disposing of obj with id %r', ident)
430 with self.mutex:
431 del self.id_to_obj[ident]
432
Benjamin Petersone711caf2008-06-11 16:44:04 +0000433
434#
435# Class to represent state of a manager
436#
437
438class State(object):
439 __slots__ = ['value']
440 INITIAL = 0
441 STARTED = 1
442 SHUTDOWN = 2
443
444#
445# Mapping from serializer name to Listener and Client types
446#
447
448listener_client = {
449 'pickle' : (connection.Listener, connection.Client),
450 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
451 }
452
453#
454# Definition of BaseManager
455#
456
457class BaseManager(object):
458 '''
459 Base class for managers
460 '''
461 _registry = {}
462 _Server = Server
463
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100464 def __init__(self, address=None, authkey=None, serializer='pickle',
465 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000466 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100467 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000468 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100469 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000470 self._state = State()
471 self._state.value = State.INITIAL
472 self._serializer = serializer
473 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100474 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475
Benjamin Petersone711caf2008-06-11 16:44:04 +0000476 def get_server(self):
477 '''
478 Return server object with serve_forever() method and address attribute
479 '''
480 assert self._state.value == State.INITIAL
481 return Server(self._registry, self._address,
482 self._authkey, self._serializer)
483
484 def connect(self):
485 '''
486 Connect manager object to the server process
487 '''
488 Listener, Client = listener_client[self._serializer]
489 conn = Client(self._address, authkey=self._authkey)
490 dispatch(conn, None, 'dummy')
491 self._state.value = State.STARTED
492
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000493 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000494 '''
495 Spawn a server process for this manager object
496 '''
497 assert self._state.value == State.INITIAL
498
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200499 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000500 raise TypeError('initializer must be a callable')
501
Benjamin Petersone711caf2008-06-11 16:44:04 +0000502 # pipe over which we will retrieve address of server
503 reader, writer = connection.Pipe(duplex=False)
504
505 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100506 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000507 target=type(self)._run_server,
508 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000509 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510 )
511 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000512 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 self._process.start()
514
515 # get address of server
516 writer.close()
517 self._address = reader.recv()
518 reader.close()
519
520 # register a finalizer
521 self._state.value = State.STARTED
522 self.shutdown = util.Finalize(
523 self, type(self)._finalize_manager,
524 args=(self._process, self._address, self._authkey,
525 self._state, self._Client),
526 exitpriority=0
527 )
528
529 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000530 def _run_server(cls, registry, address, authkey, serializer, writer,
531 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000532 '''
533 Create a server, report its address and run it
534 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000535 if initializer is not None:
536 initializer(*initargs)
537
Benjamin Petersone711caf2008-06-11 16:44:04 +0000538 # create server
539 server = cls._Server(registry, address, authkey, serializer)
540
541 # inform parent process of the server's address
542 writer.send(server.address)
543 writer.close()
544
545 # run the manager
546 util.info('manager serving at %r', server.address)
547 server.serve_forever()
548
549 def _create(self, typeid, *args, **kwds):
550 '''
551 Create a new shared object; return the token and exposed tuple
552 '''
553 assert self._state.value == State.STARTED, 'server not yet started'
554 conn = self._Client(self._address, authkey=self._authkey)
555 try:
556 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
557 finally:
558 conn.close()
559 return Token(typeid, self._address, id), exposed
560
561 def join(self, timeout=None):
562 '''
563 Join the manager process (if it has been spawned)
564 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100565 if self._process is not None:
566 self._process.join(timeout)
567 if not self._process.is_alive():
568 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000569
570 def _debug_info(self):
571 '''
572 Return some info about the servers shared objects and connections
573 '''
574 conn = self._Client(self._address, authkey=self._authkey)
575 try:
576 return dispatch(conn, None, 'debug_info')
577 finally:
578 conn.close()
579
580 def _number_of_objects(self):
581 '''
582 Return the number of shared objects
583 '''
584 conn = self._Client(self._address, authkey=self._authkey)
585 try:
586 return dispatch(conn, None, 'number_of_objects')
587 finally:
588 conn.close()
589
590 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100591 if self._state.value == State.INITIAL:
592 self.start()
593 assert self._state.value == State.STARTED
Benjamin Petersone711caf2008-06-11 16:44:04 +0000594 return self
595
596 def __exit__(self, exc_type, exc_val, exc_tb):
597 self.shutdown()
598
599 @staticmethod
600 def _finalize_manager(process, address, authkey, state, _Client):
601 '''
602 Shutdown the manager process; will be registered as a finalizer
603 '''
604 if process.is_alive():
605 util.info('sending shutdown message to manager')
606 try:
607 conn = _Client(address, authkey=authkey)
608 try:
609 dispatch(conn, None, 'shutdown')
610 finally:
611 conn.close()
612 except Exception:
613 pass
614
Richard Oudkerk3049f122012-06-15 20:08:29 +0100615 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000616 if process.is_alive():
617 util.info('manager still alive')
618 if hasattr(process, 'terminate'):
619 util.info('trying to `terminate()` manager process')
620 process.terminate()
621 process.join(timeout=0.1)
622 if process.is_alive():
623 util.info('manager still alive after terminate')
624
625 state.value = State.SHUTDOWN
626 try:
627 del BaseProxy._address_to_local[address]
628 except KeyError:
629 pass
630
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200631 @property
632 def address(self):
633 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000634
635 @classmethod
636 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
637 method_to_typeid=None, create_method=True):
638 '''
639 Register a typeid with the manager type
640 '''
641 if '_registry' not in cls.__dict__:
642 cls._registry = cls._registry.copy()
643
644 if proxytype is None:
645 proxytype = AutoProxy
646
647 exposed = exposed or getattr(proxytype, '_exposed_', None)
648
649 method_to_typeid = method_to_typeid or \
650 getattr(proxytype, '_method_to_typeid_', None)
651
652 if method_to_typeid:
653 for key, value in list(method_to_typeid.items()):
654 assert type(key) is str, '%r is not a string' % key
655 assert type(value) is str, '%r is not a string' % value
656
657 cls._registry[typeid] = (
658 callable, exposed, method_to_typeid, proxytype
659 )
660
661 if create_method:
662 def temp(self, *args, **kwds):
663 util.debug('requesting creation of a shared %r object', typeid)
664 token, exp = self._create(typeid, *args, **kwds)
665 proxy = proxytype(
666 token, self._serializer, manager=self,
667 authkey=self._authkey, exposed=exp
668 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000669 conn = self._Client(token.address, authkey=self._authkey)
670 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671 return proxy
672 temp.__name__ = typeid
673 setattr(cls, typeid, temp)
674
675#
676# Subclass of set which get cleared after a fork
677#
678
679class ProcessLocalSet(set):
680 def __init__(self):
681 util.register_after_fork(self, lambda obj: obj.clear())
682 def __reduce__(self):
683 return type(self), ()
684
685#
686# Definition of BaseProxy
687#
688
689class BaseProxy(object):
690 '''
691 A base for proxies of shared objects
692 '''
693 _address_to_local = {}
694 _mutex = util.ForkAwareThreadLock()
695
696 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500697 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100698 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000699 tls_idset = BaseProxy._address_to_local.get(token.address, None)
700 if tls_idset is None:
701 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
702 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000703
704 # self._tls is used to record the connection used by this
705 # thread to communicate with the manager at token.address
706 self._tls = tls_idset[0]
707
708 # self._idset is used to record the identities of all shared
709 # objects for which the current process owns references and
710 # which are in the manager at token.address
711 self._idset = tls_idset[1]
712
713 self._token = token
714 self._id = self._token.id
715 self._manager = manager
716 self._serializer = serializer
717 self._Client = listener_client[serializer][1]
718
Davin Potts86a76682016-09-07 18:48:01 -0500719 # Should be set to True only when a proxy object is being created
720 # on the manager server; primary use case: nested proxy objects.
721 # RebuildProxy detects when a proxy is being created on the manager
722 # and sets this value appropriately.
723 self._owned_by_manager = manager_owned
724
Benjamin Petersone711caf2008-06-11 16:44:04 +0000725 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100726 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000727 elif self._manager is not None:
728 self._authkey = self._manager._authkey
729 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100730 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000731
732 if incref:
733 self._incref()
734
735 util.register_after_fork(self, BaseProxy._after_fork)
736
737 def _connect(self):
738 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100739 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000740 if threading.current_thread().name != 'MainThread':
741 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000742 conn = self._Client(self._token.address, authkey=self._authkey)
743 dispatch(conn, None, 'accept_connection', (name,))
744 self._tls.connection = conn
745
746 def _callmethod(self, methodname, args=(), kwds={}):
747 '''
748 Try to call a method of the referrent and return a copy of the result
749 '''
750 try:
751 conn = self._tls.connection
752 except AttributeError:
753 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000754 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000755 self._connect()
756 conn = self._tls.connection
757
758 conn.send((self._id, methodname, args, kwds))
759 kind, result = conn.recv()
760
761 if kind == '#RETURN':
762 return result
763 elif kind == '#PROXY':
764 exposed, token = result
765 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100766 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000767 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000768 token, self._serializer, manager=self._manager,
769 authkey=self._authkey, exposed=exposed
770 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000771 conn = self._Client(token.address, authkey=self._authkey)
772 dispatch(conn, None, 'decref', (token.id,))
773 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000774 raise convert_to_error(kind, result)
775
776 def _getvalue(self):
777 '''
778 Get a copy of the value of the referent
779 '''
780 return self._callmethod('#GETVALUE')
781
782 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500783 if self._owned_by_manager:
784 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
785 return
786
Benjamin Petersone711caf2008-06-11 16:44:04 +0000787 conn = self._Client(self._token.address, authkey=self._authkey)
788 dispatch(conn, None, 'incref', (self._id,))
789 util.debug('INCREF %r', self._token.id)
790
791 self._idset.add(self._id)
792
793 state = self._manager and self._manager._state
794
795 self._close = util.Finalize(
796 self, BaseProxy._decref,
797 args=(self._token, self._authkey, state,
798 self._tls, self._idset, self._Client),
799 exitpriority=10
800 )
801
802 @staticmethod
803 def _decref(token, authkey, state, tls, idset, _Client):
804 idset.discard(token.id)
805
806 # check whether manager is still alive
807 if state is None or state.value == State.STARTED:
808 # tell manager this process no longer cares about referent
809 try:
810 util.debug('DECREF %r', token.id)
811 conn = _Client(token.address, authkey=authkey)
812 dispatch(conn, None, 'decref', (token.id,))
813 except Exception as e:
814 util.debug('... decref failed %s', e)
815
816 else:
817 util.debug('DECREF %r -- manager already shutdown', token.id)
818
819 # check whether we can close this thread's connection because
820 # the process owns no more references to objects for this manager
821 if not idset and hasattr(tls, 'connection'):
822 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000823 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000824 tls.connection.close()
825 del tls.connection
826
827 def _after_fork(self):
828 self._manager = None
829 try:
830 self._incref()
831 except Exception as e:
832 # the proxy may just be for a manager which has shutdown
833 util.info('incref failed: %s' % e)
834
835 def __reduce__(self):
836 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500837 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 kwds['authkey'] = self._authkey
839
840 if getattr(self, '_isauto', False):
841 kwds['exposed'] = self._exposed_
842 return (RebuildProxy,
843 (AutoProxy, self._token, self._serializer, kwds))
844 else:
845 return (RebuildProxy,
846 (type(self), self._token, self._serializer, kwds))
847
848 def __deepcopy__(self, memo):
849 return self._getvalue()
850
851 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300852 return '<%s object, typeid %r at %#x>' % \
853 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000854
855 def __str__(self):
856 '''
857 Return representation of the referent (or a fall-back if that fails)
858 '''
859 try:
860 return self._callmethod('__repr__')
861 except Exception:
862 return repr(self)[:-1] + "; '__str__()' failed>"
863
864#
865# Function used for unpickling
866#
867
868def RebuildProxy(func, token, serializer, kwds):
869 '''
870 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100872 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000873 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500874 util.debug('Rebuild a proxy owned by manager, token=%r', token)
875 kwds['manager_owned'] = True
876 if token.id not in server.id_to_local_proxy_obj:
877 server.id_to_local_proxy_obj[token.id] = \
878 server.id_to_obj[token.id]
879 incref = (
880 kwds.pop('incref', True) and
881 not getattr(process.current_process(), '_inheriting', False)
882 )
883 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000884
885#
886# Functions to create proxies and proxy types
887#
888
889def MakeProxyType(name, exposed, _cache={}):
890 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300891 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892 '''
893 exposed = tuple(exposed)
894 try:
895 return _cache[(name, exposed)]
896 except KeyError:
897 pass
898
899 dic = {}
900
901 for meth in exposed:
902 exec('''def %s(self, *args, **kwds):
903 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
904
905 ProxyType = type(name, (BaseProxy,), dic)
906 ProxyType._exposed_ = exposed
907 _cache[(name, exposed)] = ProxyType
908 return ProxyType
909
910
911def AutoProxy(token, serializer, manager=None, authkey=None,
912 exposed=None, incref=True):
913 '''
914 Return an auto-proxy for `token`
915 '''
916 _Client = listener_client[serializer][1]
917
918 if exposed is None:
919 conn = _Client(token.address, authkey=authkey)
920 try:
921 exposed = dispatch(conn, None, 'get_methods', (token,))
922 finally:
923 conn.close()
924
925 if authkey is None and manager is not None:
926 authkey = manager._authkey
927 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100928 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000929
930 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
931 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
932 incref=incref)
933 proxy._isauto = True
934 return proxy
935
936#
937# Types/callables which we will register with SyncManager
938#
939
940class Namespace(object):
941 def __init__(self, **kwds):
942 self.__dict__.update(kwds)
943 def __repr__(self):
944 items = list(self.__dict__.items())
945 temp = []
946 for name, value in items:
947 if not name.startswith('_'):
948 temp.append('%s=%r' % (name, value))
949 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300950 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000951
952class Value(object):
953 def __init__(self, typecode, value, lock=True):
954 self._typecode = typecode
955 self._value = value
956 def get(self):
957 return self._value
958 def set(self, value):
959 self._value = value
960 def __repr__(self):
961 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
962 value = property(get, set)
963
964def Array(typecode, sequence, lock=True):
965 return array.array(typecode, sequence)
966
967#
968# Proxy types used by SyncManager
969#
970
971class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000972 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000973 def __iter__(self):
974 return self
975 def __next__(self, *args):
976 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000977 def send(self, *args):
978 return self._callmethod('send', args)
979 def throw(self, *args):
980 return self._callmethod('throw', args)
981 def close(self, *args):
982 return self._callmethod('close', args)
983
984
985class AcquirerProxy(BaseProxy):
986 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100987 def acquire(self, blocking=True, timeout=None):
988 args = (blocking,) if timeout is None else (blocking, timeout)
989 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000990 def release(self):
991 return self._callmethod('release')
992 def __enter__(self):
993 return self._callmethod('acquire')
994 def __exit__(self, exc_type, exc_val, exc_tb):
995 return self._callmethod('release')
996
997
998class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000999 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001000 def wait(self, timeout=None):
1001 return self._callmethod('wait', (timeout,))
1002 def notify(self):
1003 return self._callmethod('notify')
1004 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001005 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001006 def wait_for(self, predicate, timeout=None):
1007 result = predicate()
1008 if result:
1009 return result
1010 if timeout is not None:
1011 endtime = _time() + timeout
1012 else:
1013 endtime = None
1014 waittime = None
1015 while not result:
1016 if endtime is not None:
1017 waittime = endtime - _time()
1018 if waittime <= 0:
1019 break
1020 self.wait(waittime)
1021 result = predicate()
1022 return result
1023
Benjamin Petersone711caf2008-06-11 16:44:04 +00001024
1025class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001026 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001027 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001028 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001029 def set(self):
1030 return self._callmethod('set')
1031 def clear(self):
1032 return self._callmethod('clear')
1033 def wait(self, timeout=None):
1034 return self._callmethod('wait', (timeout,))
1035
Richard Oudkerk3730a172012-06-15 18:26:07 +01001036
1037class BarrierProxy(BaseProxy):
1038 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1039 def wait(self, timeout=None):
1040 return self._callmethod('wait', (timeout,))
1041 def abort(self):
1042 return self._callmethod('abort')
1043 def reset(self):
1044 return self._callmethod('reset')
1045 @property
1046 def parties(self):
1047 return self._callmethod('__getattribute__', ('parties',))
1048 @property
1049 def n_waiting(self):
1050 return self._callmethod('__getattribute__', ('n_waiting',))
1051 @property
1052 def broken(self):
1053 return self._callmethod('__getattribute__', ('broken',))
1054
1055
Benjamin Petersone711caf2008-06-11 16:44:04 +00001056class NamespaceProxy(BaseProxy):
1057 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1058 def __getattr__(self, key):
1059 if key[0] == '_':
1060 return object.__getattribute__(self, key)
1061 callmethod = object.__getattribute__(self, '_callmethod')
1062 return callmethod('__getattribute__', (key,))
1063 def __setattr__(self, key, value):
1064 if key[0] == '_':
1065 return object.__setattr__(self, key, value)
1066 callmethod = object.__getattribute__(self, '_callmethod')
1067 return callmethod('__setattr__', (key, value))
1068 def __delattr__(self, key):
1069 if key[0] == '_':
1070 return object.__delattr__(self, key)
1071 callmethod = object.__getattribute__(self, '_callmethod')
1072 return callmethod('__delattr__', (key,))
1073
1074
1075class ValueProxy(BaseProxy):
1076 _exposed_ = ('get', 'set')
1077 def get(self):
1078 return self._callmethod('get')
1079 def set(self, value):
1080 return self._callmethod('set', (value,))
1081 value = property(get, set)
1082
1083
1084BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001085 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1086 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001087 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1088 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001089 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001090class ListProxy(BaseListProxy):
1091 def __iadd__(self, value):
1092 self._callmethod('extend', (value,))
1093 return self
1094 def __imul__(self, value):
1095 self._callmethod('__imul__', (value,))
1096 return self
1097
1098
1099DictProxy = MakeProxyType('DictProxy', (
1100 '__contains__', '__delitem__', '__getitem__', '__len__',
1101 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1102 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1103 ))
1104
1105
1106ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001107 '__len__', '__getitem__', '__setitem__'
1108 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001109
1110
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001111BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001112 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001113 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001114 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001115BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001116 'apply_async': 'AsyncResult',
1117 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001118 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001119 'imap': 'Iterator',
1120 'imap_unordered': 'Iterator'
1121 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001122class PoolProxy(BasePoolProxy):
1123 def __enter__(self):
1124 return self
1125 def __exit__(self, exc_type, exc_val, exc_tb):
1126 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001127
1128#
1129# Definition of SyncManager
1130#
1131
1132class SyncManager(BaseManager):
1133 '''
1134 Subclass of `BaseManager` which supports a number of shared object types.
1135
1136 The types registered are those intended for the synchronization
1137 of threads, plus `dict`, `list` and `Namespace`.
1138
1139 The `multiprocessing.Manager()` function creates started instances of
1140 this class.
1141 '''
1142
1143SyncManager.register('Queue', queue.Queue)
1144SyncManager.register('JoinableQueue', queue.Queue)
1145SyncManager.register('Event', threading.Event, EventProxy)
1146SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1147SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1148SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1149SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1150 AcquirerProxy)
1151SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001152SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001153SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001154SyncManager.register('list', list, ListProxy)
1155SyncManager.register('dict', dict, DictProxy)
1156SyncManager.register('Value', Value, ValueProxy)
1157SyncManager.register('Array', Array, ArrayProxy)
1158SyncManager.register('Namespace', Namespace, NamespaceProxy)
1159
1160# types returned by methods of PoolProxy
1161SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1162SyncManager.register('AsyncResult', create_method=False)