blob: c4dc972b80ebc448ea1428cbce2e8eec4598b8cf [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
Charles-François Natalic8ce7152012-04-17 18:45:57 +020022from time import time as _time
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010023from traceback import format_exc
24
25from . import connection
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010026from . import context
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from . import pool
28from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010029from . import reduction
30from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010031from . import get_context
Benjamin Petersone711caf2008-06-11 16:44:04 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
Benjamin Petersone711caf2008-06-11 16:44:04 +000034# Register some things for pickling
35#
36
37def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000038 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010039reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000040
41view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000042if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000043 def rebuild_as_list(obj):
44 return list, (list(obj),)
45 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010046 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000047
48#
49# Type for identifying shared objects
50#
51
52class Token(object):
53 '''
54 Type to uniquely indentify a shared object
55 '''
56 __slots__ = ('typeid', 'address', 'id')
57
58 def __init__(self, typeid, address, id):
59 (self.typeid, self.address, self.id) = (typeid, address, id)
60
61 def __getstate__(self):
62 return (self.typeid, self.address, self.id)
63
64 def __setstate__(self, state):
65 (self.typeid, self.address, self.id) = state
66
67 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030068 return '%s(typeid=%r, address=%r, id=%r)' % \
69 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71#
72# Function for communication with a manager's server process
73#
74
75def dispatch(c, id, methodname, args=(), kwds={}):
76 '''
77 Send a message to manager using connection `c` and return response
78 '''
79 c.send((id, methodname, args, kwds))
80 kind, result = c.recv()
81 if kind == '#RETURN':
82 return result
83 raise convert_to_error(kind, result)
84
85def convert_to_error(kind, result):
86 if kind == '#ERROR':
87 return result
88 elif kind == '#TRACEBACK':
89 assert type(result) is str
90 return RemoteError(result)
91 elif kind == '#UNSERIALIZABLE':
92 assert type(result) is str
93 return RemoteError('Unserializable message: %s\n' % result)
94 else:
95 return ValueError('Unrecognized message type')
96
97class RemoteError(Exception):
98 def __str__(self):
99 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
100
101#
102# Functions for finding the method names of an object
103#
104
105def all_methods(obj):
106 '''
107 Return a list of names of methods of `obj`
108 '''
109 temp = []
110 for name in dir(obj):
111 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200112 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000113 temp.append(name)
114 return temp
115
116def public_methods(obj):
117 '''
118 Return a list of names of methods of `obj` which do not start with '_'
119 '''
120 return [name for name in all_methods(obj) if name[0] != '_']
121
122#
123# Server which is run in a process controlled by a manager
124#
125
126class Server(object):
127 '''
128 Server class which runs in a process controlled by a manager object
129 '''
130 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
131 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
132
133 def __init__(self, registry, address, authkey, serializer):
134 assert isinstance(authkey, bytes)
135 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100136 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000137 Listener, Client = listener_client[serializer]
138
139 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100140 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000141 self.address = self.listener.address
142
Jesse Noller63b3a972009-01-21 02:15:48 +0000143 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000144 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500145 self.id_to_local_proxy_obj = {}
146 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147
148 def serve_forever(self):
149 '''
150 Run the server forever
151 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100152 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100153 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000154 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100155 accepter = threading.Thread(target=self.accepter)
156 accepter.daemon = True
157 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100159 while not self.stop_event.is_set():
160 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161 except (KeyboardInterrupt, SystemExit):
162 pass
163 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100164 if sys.stdout != sys.__stdout__:
165 util.debug('resetting stdout, stderr')
166 sys.stdout = sys.__stdout__
167 sys.stderr = sys.__stderr__
168 sys.exit(0)
169
170 def accepter(self):
171 while True:
172 try:
173 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200174 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100175 continue
176 t = threading.Thread(target=self.handle_request, args=(c,))
177 t.daemon = True
178 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179
180 def handle_request(self, c):
181 '''
182 Handle a new connection
183 '''
184 funcname = result = request = None
185 try:
186 connection.deliver_challenge(c, self.authkey)
187 connection.answer_challenge(c, self.authkey)
188 request = c.recv()
189 ignore, funcname, args, kwds = request
190 assert funcname in self.public, '%r unrecognized' % funcname
191 func = getattr(self, funcname)
192 except Exception:
193 msg = ('#TRACEBACK', format_exc())
194 else:
195 try:
196 result = func(c, *args, **kwds)
197 except Exception:
198 msg = ('#TRACEBACK', format_exc())
199 else:
200 msg = ('#RETURN', result)
201 try:
202 c.send(msg)
203 except Exception as e:
204 try:
205 c.send(('#TRACEBACK', format_exc()))
206 except Exception:
207 pass
208 util.info('Failure to send message: %r', msg)
209 util.info(' ... request was %r', request)
210 util.info(' ... exception was %r', e)
211
212 c.close()
213
214 def serve_client(self, conn):
215 '''
216 Handle requests from the proxies in a particular process/thread
217 '''
218 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000219 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220
221 recv = conn.recv
222 send = conn.send
223 id_to_obj = self.id_to_obj
224
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100225 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226
227 try:
228 methodname = obj = None
229 request = recv()
230 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500231 try:
232 obj, exposed, gettypeid = id_to_obj[ident]
233 except KeyError as ke:
234 try:
235 obj, exposed, gettypeid = \
236 self.id_to_local_proxy_obj[ident]
237 except KeyError as second_ke:
238 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239
240 if methodname not in exposed:
241 raise AttributeError(
242 'method %r of %r object is not in exposed=%r' %
243 (methodname, type(obj), exposed)
244 )
245
246 function = getattr(obj, methodname)
247
248 try:
249 res = function(*args, **kwds)
250 except Exception as e:
251 msg = ('#ERROR', e)
252 else:
253 typeid = gettypeid and gettypeid.get(methodname, None)
254 if typeid:
255 rident, rexposed = self.create(conn, typeid, res)
256 token = Token(typeid, self.address, rident)
257 msg = ('#PROXY', (rexposed, token))
258 else:
259 msg = ('#RETURN', res)
260
261 except AttributeError:
262 if methodname is None:
263 msg = ('#TRACEBACK', format_exc())
264 else:
265 try:
266 fallback_func = self.fallback_mapping[methodname]
267 result = fallback_func(
268 self, conn, ident, obj, *args, **kwds
269 )
270 msg = ('#RETURN', result)
271 except Exception:
272 msg = ('#TRACEBACK', format_exc())
273
274 except EOFError:
275 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000276 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277 sys.exit(0)
278
279 except Exception:
280 msg = ('#TRACEBACK', format_exc())
281
282 try:
283 try:
284 send(msg)
285 except Exception as e:
Davin Potts37156a72016-09-08 14:40:36 -0500286 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000287 except Exception as e:
288 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000289 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000290 util.info(' ... message was %r', msg)
291 util.info(' ... exception was %r', e)
292 conn.close()
293 sys.exit(1)
294
295 def fallback_getvalue(self, conn, ident, obj):
296 return obj
297
298 def fallback_str(self, conn, ident, obj):
299 return str(obj)
300
301 def fallback_repr(self, conn, ident, obj):
302 return repr(obj)
303
304 fallback_mapping = {
305 '__str__':fallback_str,
306 '__repr__':fallback_repr,
307 '#GETVALUE':fallback_getvalue
308 }
309
310 def dummy(self, c):
311 pass
312
313 def debug_info(self, c):
314 '''
315 Return some info --- useful to spot problems with refcounting
316 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100317 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000318 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500319 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000320 keys.sort()
321 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000322 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323 result.append(' %s: refcount=%s\n %s' %
324 (ident, self.id_to_refcount[ident],
325 str(self.id_to_obj[ident][0])[:75]))
326 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
328 def number_of_objects(self, c):
329 '''
330 Number of shared objects
331 '''
Davin Potts86a76682016-09-07 18:48:01 -0500332 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
333 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000334
335 def shutdown(self, c):
336 '''
337 Shutdown this process
338 '''
339 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100340 util.debug('manager received shutdown message')
341 c.send(('#RETURN', None))
342 except:
343 import traceback
344 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000345 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100346 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000347
348 def create(self, c, typeid, *args, **kwds):
349 '''
350 Create a new shared object and return its id
351 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100352 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000353 callable, exposed, method_to_typeid, proxytype = \
354 self.registry[typeid]
355
356 if callable is None:
357 assert len(args) == 1 and not kwds
358 obj = args[0]
359 else:
360 obj = callable(*args, **kwds)
361
362 if exposed is None:
363 exposed = public_methods(obj)
364 if method_to_typeid is not None:
365 assert type(method_to_typeid) is dict
366 exposed = list(exposed) + list(method_to_typeid)
367
368 ident = '%x' % id(obj) # convert to string because xmlrpclib
369 # only has 32 bit signed integers
370 util.debug('%r callable returned object with id %r', typeid, ident)
371
372 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
373 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000374 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500375
376 self.incref(c, ident)
377 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000378
379 def get_methods(self, c, token):
380 '''
381 Return the methods of the shared object indicated by token
382 '''
383 return tuple(self.id_to_obj[token.id][1])
384
385 def accept_connection(self, c, name):
386 '''
387 Spawn a new thread to serve this connection
388 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000389 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000390 c.send(('#RETURN', None))
391 self.serve_client(c)
392
393 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100394 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500395 try:
396 self.id_to_refcount[ident] += 1
397 except KeyError as ke:
398 # If no external references exist but an internal (to the
399 # manager) still does and a new external reference is created
400 # from it, restore the manager's tracking of it from the
401 # previously stashed internal ref.
402 if ident in self.id_to_local_proxy_obj:
403 self.id_to_refcount[ident] = 1
404 self.id_to_obj[ident] = \
405 self.id_to_local_proxy_obj[ident]
406 obj, exposed, gettypeid = self.id_to_obj[ident]
407 util.debug('Server re-enabled tracking & INCREF %r', ident)
408 else:
409 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000410
411 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500412 if ident not in self.id_to_refcount and \
413 ident in self.id_to_local_proxy_obj:
414 util.debug('Server DECREF skipping %r', ident)
415 return
416
Charles-François Natalia924fc72014-05-25 14:12:12 +0100417 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000418 assert self.id_to_refcount[ident] >= 1
419 self.id_to_refcount[ident] -= 1
420 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500421 del self.id_to_refcount[ident]
422
423 if ident not in self.id_to_refcount:
424 # Two-step process in case the object turns out to contain other
425 # proxy objects (e.g. a managed list of managed lists).
426 # Otherwise, deleting self.id_to_obj[ident] would trigger the
427 # deleting of the stored value (another managed object) which would
428 # in turn attempt to acquire the mutex that is already held here.
429 self.id_to_obj[ident] = (None, (), None) # thread-safe
430 util.debug('disposing of obj with id %r', ident)
431 with self.mutex:
432 del self.id_to_obj[ident]
433
Benjamin Petersone711caf2008-06-11 16:44:04 +0000434
435#
436# Class to represent state of a manager
437#
438
439class State(object):
440 __slots__ = ['value']
441 INITIAL = 0
442 STARTED = 1
443 SHUTDOWN = 2
444
445#
446# Mapping from serializer name to Listener and Client types
447#
448
449listener_client = {
450 'pickle' : (connection.Listener, connection.Client),
451 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
452 }
453
454#
455# Definition of BaseManager
456#
457
458class BaseManager(object):
459 '''
460 Base class for managers
461 '''
462 _registry = {}
463 _Server = Server
464
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100465 def __init__(self, address=None, authkey=None, serializer='pickle',
466 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000467 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100468 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000469 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100470 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000471 self._state = State()
472 self._state.value = State.INITIAL
473 self._serializer = serializer
474 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100475 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000476
Benjamin Petersone711caf2008-06-11 16:44:04 +0000477 def get_server(self):
478 '''
479 Return server object with serve_forever() method and address attribute
480 '''
481 assert self._state.value == State.INITIAL
482 return Server(self._registry, self._address,
483 self._authkey, self._serializer)
484
485 def connect(self):
486 '''
487 Connect manager object to the server process
488 '''
489 Listener, Client = listener_client[self._serializer]
490 conn = Client(self._address, authkey=self._authkey)
491 dispatch(conn, None, 'dummy')
492 self._state.value = State.STARTED
493
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000494 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000495 '''
496 Spawn a server process for this manager object
497 '''
498 assert self._state.value == State.INITIAL
499
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200500 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000501 raise TypeError('initializer must be a callable')
502
Benjamin Petersone711caf2008-06-11 16:44:04 +0000503 # pipe over which we will retrieve address of server
504 reader, writer = connection.Pipe(duplex=False)
505
506 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100507 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000508 target=type(self)._run_server,
509 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000510 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000511 )
512 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000513 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000514 self._process.start()
515
516 # get address of server
517 writer.close()
518 self._address = reader.recv()
519 reader.close()
520
521 # register a finalizer
522 self._state.value = State.STARTED
523 self.shutdown = util.Finalize(
524 self, type(self)._finalize_manager,
525 args=(self._process, self._address, self._authkey,
526 self._state, self._Client),
527 exitpriority=0
528 )
529
530 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000531 def _run_server(cls, registry, address, authkey, serializer, writer,
532 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000533 '''
534 Create a server, report its address and run it
535 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000536 if initializer is not None:
537 initializer(*initargs)
538
Benjamin Petersone711caf2008-06-11 16:44:04 +0000539 # create server
540 server = cls._Server(registry, address, authkey, serializer)
541
542 # inform parent process of the server's address
543 writer.send(server.address)
544 writer.close()
545
546 # run the manager
547 util.info('manager serving at %r', server.address)
548 server.serve_forever()
549
550 def _create(self, typeid, *args, **kwds):
551 '''
552 Create a new shared object; return the token and exposed tuple
553 '''
554 assert self._state.value == State.STARTED, 'server not yet started'
555 conn = self._Client(self._address, authkey=self._authkey)
556 try:
557 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
558 finally:
559 conn.close()
560 return Token(typeid, self._address, id), exposed
561
562 def join(self, timeout=None):
563 '''
564 Join the manager process (if it has been spawned)
565 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100566 if self._process is not None:
567 self._process.join(timeout)
568 if not self._process.is_alive():
569 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000570
571 def _debug_info(self):
572 '''
573 Return some info about the servers shared objects and connections
574 '''
575 conn = self._Client(self._address, authkey=self._authkey)
576 try:
577 return dispatch(conn, None, 'debug_info')
578 finally:
579 conn.close()
580
581 def _number_of_objects(self):
582 '''
583 Return the number of shared objects
584 '''
585 conn = self._Client(self._address, authkey=self._authkey)
586 try:
587 return dispatch(conn, None, 'number_of_objects')
588 finally:
589 conn.close()
590
591 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100592 if self._state.value == State.INITIAL:
593 self.start()
594 assert self._state.value == State.STARTED
Benjamin Petersone711caf2008-06-11 16:44:04 +0000595 return self
596
597 def __exit__(self, exc_type, exc_val, exc_tb):
598 self.shutdown()
599
600 @staticmethod
601 def _finalize_manager(process, address, authkey, state, _Client):
602 '''
603 Shutdown the manager process; will be registered as a finalizer
604 '''
605 if process.is_alive():
606 util.info('sending shutdown message to manager')
607 try:
608 conn = _Client(address, authkey=authkey)
609 try:
610 dispatch(conn, None, 'shutdown')
611 finally:
612 conn.close()
613 except Exception:
614 pass
615
Richard Oudkerk3049f122012-06-15 20:08:29 +0100616 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000617 if process.is_alive():
618 util.info('manager still alive')
619 if hasattr(process, 'terminate'):
620 util.info('trying to `terminate()` manager process')
621 process.terminate()
622 process.join(timeout=0.1)
623 if process.is_alive():
624 util.info('manager still alive after terminate')
625
626 state.value = State.SHUTDOWN
627 try:
628 del BaseProxy._address_to_local[address]
629 except KeyError:
630 pass
631
632 address = property(lambda self: self._address)
633
634 @classmethod
635 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
636 method_to_typeid=None, create_method=True):
637 '''
638 Register a typeid with the manager type
639 '''
640 if '_registry' not in cls.__dict__:
641 cls._registry = cls._registry.copy()
642
643 if proxytype is None:
644 proxytype = AutoProxy
645
646 exposed = exposed or getattr(proxytype, '_exposed_', None)
647
648 method_to_typeid = method_to_typeid or \
649 getattr(proxytype, '_method_to_typeid_', None)
650
651 if method_to_typeid:
652 for key, value in list(method_to_typeid.items()):
653 assert type(key) is str, '%r is not a string' % key
654 assert type(value) is str, '%r is not a string' % value
655
656 cls._registry[typeid] = (
657 callable, exposed, method_to_typeid, proxytype
658 )
659
660 if create_method:
661 def temp(self, *args, **kwds):
662 util.debug('requesting creation of a shared %r object', typeid)
663 token, exp = self._create(typeid, *args, **kwds)
664 proxy = proxytype(
665 token, self._serializer, manager=self,
666 authkey=self._authkey, exposed=exp
667 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000668 conn = self._Client(token.address, authkey=self._authkey)
669 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000670 return proxy
671 temp.__name__ = typeid
672 setattr(cls, typeid, temp)
673
674#
675# Subclass of set which get cleared after a fork
676#
677
678class ProcessLocalSet(set):
679 def __init__(self):
680 util.register_after_fork(self, lambda obj: obj.clear())
681 def __reduce__(self):
682 return type(self), ()
683
684#
685# Definition of BaseProxy
686#
687
688class BaseProxy(object):
689 '''
690 A base for proxies of shared objects
691 '''
692 _address_to_local = {}
693 _mutex = util.ForkAwareThreadLock()
694
695 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500696 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100697 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000698 tls_idset = BaseProxy._address_to_local.get(token.address, None)
699 if tls_idset is None:
700 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
701 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000702
703 # self._tls is used to record the connection used by this
704 # thread to communicate with the manager at token.address
705 self._tls = tls_idset[0]
706
707 # self._idset is used to record the identities of all shared
708 # objects for which the current process owns references and
709 # which are in the manager at token.address
710 self._idset = tls_idset[1]
711
712 self._token = token
713 self._id = self._token.id
714 self._manager = manager
715 self._serializer = serializer
716 self._Client = listener_client[serializer][1]
717
Davin Potts86a76682016-09-07 18:48:01 -0500718 # Should be set to True only when a proxy object is being created
719 # on the manager server; primary use case: nested proxy objects.
720 # RebuildProxy detects when a proxy is being created on the manager
721 # and sets this value appropriately.
722 self._owned_by_manager = manager_owned
723
Benjamin Petersone711caf2008-06-11 16:44:04 +0000724 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100725 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000726 elif self._manager is not None:
727 self._authkey = self._manager._authkey
728 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100729 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000730
731 if incref:
732 self._incref()
733
734 util.register_after_fork(self, BaseProxy._after_fork)
735
736 def _connect(self):
737 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100738 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000739 if threading.current_thread().name != 'MainThread':
740 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000741 conn = self._Client(self._token.address, authkey=self._authkey)
742 dispatch(conn, None, 'accept_connection', (name,))
743 self._tls.connection = conn
744
745 def _callmethod(self, methodname, args=(), kwds={}):
746 '''
747 Try to call a method of the referrent and return a copy of the result
748 '''
749 try:
750 conn = self._tls.connection
751 except AttributeError:
752 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000753 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000754 self._connect()
755 conn = self._tls.connection
756
757 conn.send((self._id, methodname, args, kwds))
758 kind, result = conn.recv()
759
760 if kind == '#RETURN':
761 return result
762 elif kind == '#PROXY':
763 exposed, token = result
764 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100765 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000766 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000767 token, self._serializer, manager=self._manager,
768 authkey=self._authkey, exposed=exposed
769 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000770 conn = self._Client(token.address, authkey=self._authkey)
771 dispatch(conn, None, 'decref', (token.id,))
772 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000773 raise convert_to_error(kind, result)
774
775 def _getvalue(self):
776 '''
777 Get a copy of the value of the referent
778 '''
779 return self._callmethod('#GETVALUE')
780
781 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500782 if self._owned_by_manager:
783 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
784 return
785
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 conn = self._Client(self._token.address, authkey=self._authkey)
787 dispatch(conn, None, 'incref', (self._id,))
788 util.debug('INCREF %r', self._token.id)
789
790 self._idset.add(self._id)
791
792 state = self._manager and self._manager._state
793
794 self._close = util.Finalize(
795 self, BaseProxy._decref,
796 args=(self._token, self._authkey, state,
797 self._tls, self._idset, self._Client),
798 exitpriority=10
799 )
800
801 @staticmethod
802 def _decref(token, authkey, state, tls, idset, _Client):
803 idset.discard(token.id)
804
805 # check whether manager is still alive
806 if state is None or state.value == State.STARTED:
807 # tell manager this process no longer cares about referent
808 try:
809 util.debug('DECREF %r', token.id)
810 conn = _Client(token.address, authkey=authkey)
811 dispatch(conn, None, 'decref', (token.id,))
812 except Exception as e:
813 util.debug('... decref failed %s', e)
814
815 else:
816 util.debug('DECREF %r -- manager already shutdown', token.id)
817
818 # check whether we can close this thread's connection because
819 # the process owns no more references to objects for this manager
820 if not idset and hasattr(tls, 'connection'):
821 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000822 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000823 tls.connection.close()
824 del tls.connection
825
826 def _after_fork(self):
827 self._manager = None
828 try:
829 self._incref()
830 except Exception as e:
831 # the proxy may just be for a manager which has shutdown
832 util.info('incref failed: %s' % e)
833
834 def __reduce__(self):
835 kwds = {}
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100836 if context.get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000837 kwds['authkey'] = self._authkey
838
839 if getattr(self, '_isauto', False):
840 kwds['exposed'] = self._exposed_
841 return (RebuildProxy,
842 (AutoProxy, self._token, self._serializer, kwds))
843 else:
844 return (RebuildProxy,
845 (type(self), self._token, self._serializer, kwds))
846
847 def __deepcopy__(self, memo):
848 return self._getvalue()
849
850 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300851 return '<%s object, typeid %r at %#x>' % \
852 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853
854 def __str__(self):
855 '''
856 Return representation of the referent (or a fall-back if that fails)
857 '''
858 try:
859 return self._callmethod('__repr__')
860 except Exception:
861 return repr(self)[:-1] + "; '__str__()' failed>"
862
863#
864# Function used for unpickling
865#
866
867def RebuildProxy(func, token, serializer, kwds):
868 '''
869 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000870 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100871 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000872 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500873 util.debug('Rebuild a proxy owned by manager, token=%r', token)
874 kwds['manager_owned'] = True
875 if token.id not in server.id_to_local_proxy_obj:
876 server.id_to_local_proxy_obj[token.id] = \
877 server.id_to_obj[token.id]
878 incref = (
879 kwds.pop('incref', True) and
880 not getattr(process.current_process(), '_inheriting', False)
881 )
882 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883
884#
885# Functions to create proxies and proxy types
886#
887
888def MakeProxyType(name, exposed, _cache={}):
889 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300890 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891 '''
892 exposed = tuple(exposed)
893 try:
894 return _cache[(name, exposed)]
895 except KeyError:
896 pass
897
898 dic = {}
899
900 for meth in exposed:
901 exec('''def %s(self, *args, **kwds):
902 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
903
904 ProxyType = type(name, (BaseProxy,), dic)
905 ProxyType._exposed_ = exposed
906 _cache[(name, exposed)] = ProxyType
907 return ProxyType
908
909
910def AutoProxy(token, serializer, manager=None, authkey=None,
911 exposed=None, incref=True):
912 '''
913 Return an auto-proxy for `token`
914 '''
915 _Client = listener_client[serializer][1]
916
917 if exposed is None:
918 conn = _Client(token.address, authkey=authkey)
919 try:
920 exposed = dispatch(conn, None, 'get_methods', (token,))
921 finally:
922 conn.close()
923
924 if authkey is None and manager is not None:
925 authkey = manager._authkey
926 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100927 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000928
929 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
930 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
931 incref=incref)
932 proxy._isauto = True
933 return proxy
934
935#
936# Types/callables which we will register with SyncManager
937#
938
939class Namespace(object):
940 def __init__(self, **kwds):
941 self.__dict__.update(kwds)
942 def __repr__(self):
943 items = list(self.__dict__.items())
944 temp = []
945 for name, value in items:
946 if not name.startswith('_'):
947 temp.append('%s=%r' % (name, value))
948 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300949 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000950
951class Value(object):
952 def __init__(self, typecode, value, lock=True):
953 self._typecode = typecode
954 self._value = value
955 def get(self):
956 return self._value
957 def set(self, value):
958 self._value = value
959 def __repr__(self):
960 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
961 value = property(get, set)
962
963def Array(typecode, sequence, lock=True):
964 return array.array(typecode, sequence)
965
966#
967# Proxy types used by SyncManager
968#
969
970class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000971 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000972 def __iter__(self):
973 return self
974 def __next__(self, *args):
975 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976 def send(self, *args):
977 return self._callmethod('send', args)
978 def throw(self, *args):
979 return self._callmethod('throw', args)
980 def close(self, *args):
981 return self._callmethod('close', args)
982
983
984class AcquirerProxy(BaseProxy):
985 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100986 def acquire(self, blocking=True, timeout=None):
987 args = (blocking,) if timeout is None else (blocking, timeout)
988 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000989 def release(self):
990 return self._callmethod('release')
991 def __enter__(self):
992 return self._callmethod('acquire')
993 def __exit__(self, exc_type, exc_val, exc_tb):
994 return self._callmethod('release')
995
996
997class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000998 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000999 def wait(self, timeout=None):
1000 return self._callmethod('wait', (timeout,))
1001 def notify(self):
1002 return self._callmethod('notify')
1003 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001004 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001005 def wait_for(self, predicate, timeout=None):
1006 result = predicate()
1007 if result:
1008 return result
1009 if timeout is not None:
1010 endtime = _time() + timeout
1011 else:
1012 endtime = None
1013 waittime = None
1014 while not result:
1015 if endtime is not None:
1016 waittime = endtime - _time()
1017 if waittime <= 0:
1018 break
1019 self.wait(waittime)
1020 result = predicate()
1021 return result
1022
Benjamin Petersone711caf2008-06-11 16:44:04 +00001023
1024class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001025 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001026 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001027 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001028 def set(self):
1029 return self._callmethod('set')
1030 def clear(self):
1031 return self._callmethod('clear')
1032 def wait(self, timeout=None):
1033 return self._callmethod('wait', (timeout,))
1034
Richard Oudkerk3730a172012-06-15 18:26:07 +01001035
1036class BarrierProxy(BaseProxy):
1037 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1038 def wait(self, timeout=None):
1039 return self._callmethod('wait', (timeout,))
1040 def abort(self):
1041 return self._callmethod('abort')
1042 def reset(self):
1043 return self._callmethod('reset')
1044 @property
1045 def parties(self):
1046 return self._callmethod('__getattribute__', ('parties',))
1047 @property
1048 def n_waiting(self):
1049 return self._callmethod('__getattribute__', ('n_waiting',))
1050 @property
1051 def broken(self):
1052 return self._callmethod('__getattribute__', ('broken',))
1053
1054
Benjamin Petersone711caf2008-06-11 16:44:04 +00001055class NamespaceProxy(BaseProxy):
1056 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1057 def __getattr__(self, key):
1058 if key[0] == '_':
1059 return object.__getattribute__(self, key)
1060 callmethod = object.__getattribute__(self, '_callmethod')
1061 return callmethod('__getattribute__', (key,))
1062 def __setattr__(self, key, value):
1063 if key[0] == '_':
1064 return object.__setattr__(self, key, value)
1065 callmethod = object.__getattribute__(self, '_callmethod')
1066 return callmethod('__setattr__', (key, value))
1067 def __delattr__(self, key):
1068 if key[0] == '_':
1069 return object.__delattr__(self, key)
1070 callmethod = object.__getattribute__(self, '_callmethod')
1071 return callmethod('__delattr__', (key,))
1072
1073
1074class ValueProxy(BaseProxy):
1075 _exposed_ = ('get', 'set')
1076 def get(self):
1077 return self._callmethod('get')
1078 def set(self, value):
1079 return self._callmethod('set', (value,))
1080 value = property(get, set)
1081
1082
1083BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001084 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1085 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001086 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1087 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001088 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001089class ListProxy(BaseListProxy):
1090 def __iadd__(self, value):
1091 self._callmethod('extend', (value,))
1092 return self
1093 def __imul__(self, value):
1094 self._callmethod('__imul__', (value,))
1095 return self
1096
1097
1098DictProxy = MakeProxyType('DictProxy', (
1099 '__contains__', '__delitem__', '__getitem__', '__len__',
1100 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1101 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1102 ))
1103
1104
1105ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001106 '__len__', '__getitem__', '__setitem__'
1107 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001108
1109
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001110BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001111 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001112 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001113 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001114BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001115 'apply_async': 'AsyncResult',
1116 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001117 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001118 'imap': 'Iterator',
1119 'imap_unordered': 'Iterator'
1120 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001121class PoolProxy(BasePoolProxy):
1122 def __enter__(self):
1123 return self
1124 def __exit__(self, exc_type, exc_val, exc_tb):
1125 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001126
1127#
1128# Definition of SyncManager
1129#
1130
1131class SyncManager(BaseManager):
1132 '''
1133 Subclass of `BaseManager` which supports a number of shared object types.
1134
1135 The types registered are those intended for the synchronization
1136 of threads, plus `dict`, `list` and `Namespace`.
1137
1138 The `multiprocessing.Manager()` function creates started instances of
1139 this class.
1140 '''
1141
1142SyncManager.register('Queue', queue.Queue)
1143SyncManager.register('JoinableQueue', queue.Queue)
1144SyncManager.register('Event', threading.Event, EventProxy)
1145SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1146SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1147SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1148SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1149 AcquirerProxy)
1150SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001151SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001152SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001153SyncManager.register('list', list, ListProxy)
1154SyncManager.register('dict', dict, DictProxy)
1155SyncManager.register('Value', Value, ValueProxy)
1156SyncManager.register('Array', Array, ArrayProxy)
1157SyncManager.register('Namespace', Namespace, NamespaceProxy)
1158
1159# types returned by methods of PoolProxy
1160SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1161SyncManager.register('AsyncResult', create_method=False)