blob: c6722771b05c94f97469ed3db14250f99142fd45 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
Charles-François Natalic8ce7152012-04-17 18:45:57 +020022from time import time as _time
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010023from traceback import format_exc
24
25from . import connection
Davin Potts54586472016-09-09 18:03:10 -050026from .context import reduction, get_spawning_popen
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from . import pool
28from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010029from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010030from . import get_context
Benjamin Petersone711caf2008-06-11 16:44:04 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032#
Benjamin Petersone711caf2008-06-11 16:44:04 +000033# Register some things for pickling
34#
35
36def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000037 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010038reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
40view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000041if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000042 def rebuild_as_list(obj):
43 return list, (list(obj),)
44 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010045 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
47#
48# Type for identifying shared objects
49#
50
51class Token(object):
52 '''
53 Type to uniquely indentify a shared object
54 '''
55 __slots__ = ('typeid', 'address', 'id')
56
57 def __init__(self, typeid, address, id):
58 (self.typeid, self.address, self.id) = (typeid, address, id)
59
60 def __getstate__(self):
61 return (self.typeid, self.address, self.id)
62
63 def __setstate__(self, state):
64 (self.typeid, self.address, self.id) = state
65
66 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030067 return '%s(typeid=%r, address=%r, id=%r)' % \
68 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000069
70#
71# Function for communication with a manager's server process
72#
73
74def dispatch(c, id, methodname, args=(), kwds={}):
75 '''
76 Send a message to manager using connection `c` and return response
77 '''
78 c.send((id, methodname, args, kwds))
79 kind, result = c.recv()
80 if kind == '#RETURN':
81 return result
82 raise convert_to_error(kind, result)
83
84def convert_to_error(kind, result):
85 if kind == '#ERROR':
86 return result
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050087 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
88 if not isinstance(result, str):
89 raise TypeError(
90 "Result {0!r} (kind '{1}') type is {2}, not str".format(
91 result, kind, type(result)))
92 if kind == '#UNSERIALIZABLE':
93 return RemoteError('Unserializable message: %s\n' % result)
94 else:
95 return RemoteError(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +000096 else:
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050097 return ValueError('Unrecognized message type {!r}'.format(kind))
Benjamin Petersone711caf2008-06-11 16:44:04 +000098
99class RemoteError(Exception):
100 def __str__(self):
101 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
102
103#
104# Functions for finding the method names of an object
105#
106
107def all_methods(obj):
108 '''
109 Return a list of names of methods of `obj`
110 '''
111 temp = []
112 for name in dir(obj):
113 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200114 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000115 temp.append(name)
116 return temp
117
118def public_methods(obj):
119 '''
120 Return a list of names of methods of `obj` which do not start with '_'
121 '''
122 return [name for name in all_methods(obj) if name[0] != '_']
123
124#
125# Server which is run in a process controlled by a manager
126#
127
128class Server(object):
129 '''
130 Server class which runs in a process controlled by a manager object
131 '''
132 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
133 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
134
135 def __init__(self, registry, address, authkey, serializer):
136 assert isinstance(authkey, bytes)
137 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100138 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000139 Listener, Client = listener_client[serializer]
140
141 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100142 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143 self.address = self.listener.address
144
Jesse Noller63b3a972009-01-21 02:15:48 +0000145 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500147 self.id_to_local_proxy_obj = {}
148 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149
150 def serve_forever(self):
151 '''
152 Run the server forever
153 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100154 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100155 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100157 accepter = threading.Thread(target=self.accepter)
158 accepter.daemon = True
159 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100161 while not self.stop_event.is_set():
162 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 except (KeyboardInterrupt, SystemExit):
164 pass
165 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100166 if sys.stdout != sys.__stdout__:
167 util.debug('resetting stdout, stderr')
168 sys.stdout = sys.__stdout__
169 sys.stderr = sys.__stderr__
170 sys.exit(0)
171
172 def accepter(self):
173 while True:
174 try:
175 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200176 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100177 continue
178 t = threading.Thread(target=self.handle_request, args=(c,))
179 t.daemon = True
180 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000181
182 def handle_request(self, c):
183 '''
184 Handle a new connection
185 '''
186 funcname = result = request = None
187 try:
188 connection.deliver_challenge(c, self.authkey)
189 connection.answer_challenge(c, self.authkey)
190 request = c.recv()
191 ignore, funcname, args, kwds = request
192 assert funcname in self.public, '%r unrecognized' % funcname
193 func = getattr(self, funcname)
194 except Exception:
195 msg = ('#TRACEBACK', format_exc())
196 else:
197 try:
198 result = func(c, *args, **kwds)
199 except Exception:
200 msg = ('#TRACEBACK', format_exc())
201 else:
202 msg = ('#RETURN', result)
203 try:
204 c.send(msg)
205 except Exception as e:
206 try:
207 c.send(('#TRACEBACK', format_exc()))
208 except Exception:
209 pass
210 util.info('Failure to send message: %r', msg)
211 util.info(' ... request was %r', request)
212 util.info(' ... exception was %r', e)
213
214 c.close()
215
216 def serve_client(self, conn):
217 '''
218 Handle requests from the proxies in a particular process/thread
219 '''
220 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000221 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222
223 recv = conn.recv
224 send = conn.send
225 id_to_obj = self.id_to_obj
226
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100227 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228
229 try:
230 methodname = obj = None
231 request = recv()
232 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500233 try:
234 obj, exposed, gettypeid = id_to_obj[ident]
235 except KeyError as ke:
236 try:
237 obj, exposed, gettypeid = \
238 self.id_to_local_proxy_obj[ident]
239 except KeyError as second_ke:
240 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 if methodname not in exposed:
243 raise AttributeError(
244 'method %r of %r object is not in exposed=%r' %
245 (methodname, type(obj), exposed)
246 )
247
248 function = getattr(obj, methodname)
249
250 try:
251 res = function(*args, **kwds)
252 except Exception as e:
253 msg = ('#ERROR', e)
254 else:
255 typeid = gettypeid and gettypeid.get(methodname, None)
256 if typeid:
257 rident, rexposed = self.create(conn, typeid, res)
258 token = Token(typeid, self.address, rident)
259 msg = ('#PROXY', (rexposed, token))
260 else:
261 msg = ('#RETURN', res)
262
263 except AttributeError:
264 if methodname is None:
265 msg = ('#TRACEBACK', format_exc())
266 else:
267 try:
268 fallback_func = self.fallback_mapping[methodname]
269 result = fallback_func(
270 self, conn, ident, obj, *args, **kwds
271 )
272 msg = ('#RETURN', result)
273 except Exception:
274 msg = ('#TRACEBACK', format_exc())
275
276 except EOFError:
277 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000278 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279 sys.exit(0)
280
281 except Exception:
282 msg = ('#TRACEBACK', format_exc())
283
284 try:
285 try:
286 send(msg)
287 except Exception as e:
Davin Potts37156a72016-09-08 14:40:36 -0500288 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000289 except Exception as e:
290 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000291 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000292 util.info(' ... message was %r', msg)
293 util.info(' ... exception was %r', e)
294 conn.close()
295 sys.exit(1)
296
297 def fallback_getvalue(self, conn, ident, obj):
298 return obj
299
300 def fallback_str(self, conn, ident, obj):
301 return str(obj)
302
303 def fallback_repr(self, conn, ident, obj):
304 return repr(obj)
305
306 fallback_mapping = {
307 '__str__':fallback_str,
308 '__repr__':fallback_repr,
309 '#GETVALUE':fallback_getvalue
310 }
311
312 def dummy(self, c):
313 pass
314
315 def debug_info(self, c):
316 '''
317 Return some info --- useful to spot problems with refcounting
318 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100319 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000320 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500321 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322 keys.sort()
323 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000324 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325 result.append(' %s: refcount=%s\n %s' %
326 (ident, self.id_to_refcount[ident],
327 str(self.id_to_obj[ident][0])[:75]))
328 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000329
330 def number_of_objects(self, c):
331 '''
332 Number of shared objects
333 '''
Davin Potts86a76682016-09-07 18:48:01 -0500334 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
335 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336
337 def shutdown(self, c):
338 '''
339 Shutdown this process
340 '''
341 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100342 util.debug('manager received shutdown message')
343 c.send(('#RETURN', None))
344 except:
345 import traceback
346 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000347 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100348 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000349
350 def create(self, c, typeid, *args, **kwds):
351 '''
352 Create a new shared object and return its id
353 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100354 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000355 callable, exposed, method_to_typeid, proxytype = \
356 self.registry[typeid]
357
358 if callable is None:
359 assert len(args) == 1 and not kwds
360 obj = args[0]
361 else:
362 obj = callable(*args, **kwds)
363
364 if exposed is None:
365 exposed = public_methods(obj)
366 if method_to_typeid is not None:
367 assert type(method_to_typeid) is dict
368 exposed = list(exposed) + list(method_to_typeid)
369
370 ident = '%x' % id(obj) # convert to string because xmlrpclib
371 # only has 32 bit signed integers
372 util.debug('%r callable returned object with id %r', typeid, ident)
373
374 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
375 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000376 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500377
378 self.incref(c, ident)
379 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000380
381 def get_methods(self, c, token):
382 '''
383 Return the methods of the shared object indicated by token
384 '''
385 return tuple(self.id_to_obj[token.id][1])
386
387 def accept_connection(self, c, name):
388 '''
389 Spawn a new thread to serve this connection
390 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000391 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000392 c.send(('#RETURN', None))
393 self.serve_client(c)
394
395 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100396 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500397 try:
398 self.id_to_refcount[ident] += 1
399 except KeyError as ke:
400 # If no external references exist but an internal (to the
401 # manager) still does and a new external reference is created
402 # from it, restore the manager's tracking of it from the
403 # previously stashed internal ref.
404 if ident in self.id_to_local_proxy_obj:
405 self.id_to_refcount[ident] = 1
406 self.id_to_obj[ident] = \
407 self.id_to_local_proxy_obj[ident]
408 obj, exposed, gettypeid = self.id_to_obj[ident]
409 util.debug('Server re-enabled tracking & INCREF %r', ident)
410 else:
411 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000412
413 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500414 if ident not in self.id_to_refcount and \
415 ident in self.id_to_local_proxy_obj:
416 util.debug('Server DECREF skipping %r', ident)
417 return
418
Charles-François Natalia924fc72014-05-25 14:12:12 +0100419 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000420 assert self.id_to_refcount[ident] >= 1
421 self.id_to_refcount[ident] -= 1
422 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500423 del self.id_to_refcount[ident]
424
425 if ident not in self.id_to_refcount:
426 # Two-step process in case the object turns out to contain other
427 # proxy objects (e.g. a managed list of managed lists).
428 # Otherwise, deleting self.id_to_obj[ident] would trigger the
429 # deleting of the stored value (another managed object) which would
430 # in turn attempt to acquire the mutex that is already held here.
431 self.id_to_obj[ident] = (None, (), None) # thread-safe
432 util.debug('disposing of obj with id %r', ident)
433 with self.mutex:
434 del self.id_to_obj[ident]
435
Benjamin Petersone711caf2008-06-11 16:44:04 +0000436
437#
438# Class to represent state of a manager
439#
440
441class State(object):
442 __slots__ = ['value']
443 INITIAL = 0
444 STARTED = 1
445 SHUTDOWN = 2
446
447#
448# Mapping from serializer name to Listener and Client types
449#
450
451listener_client = {
452 'pickle' : (connection.Listener, connection.Client),
453 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
454 }
455
456#
457# Definition of BaseManager
458#
459
460class BaseManager(object):
461 '''
462 Base class for managers
463 '''
464 _registry = {}
465 _Server = Server
466
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100467 def __init__(self, address=None, authkey=None, serializer='pickle',
468 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000469 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100470 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000471 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100472 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000473 self._state = State()
474 self._state.value = State.INITIAL
475 self._serializer = serializer
476 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100477 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000478
Benjamin Petersone711caf2008-06-11 16:44:04 +0000479 def get_server(self):
480 '''
481 Return server object with serve_forever() method and address attribute
482 '''
483 assert self._state.value == State.INITIAL
484 return Server(self._registry, self._address,
485 self._authkey, self._serializer)
486
487 def connect(self):
488 '''
489 Connect manager object to the server process
490 '''
491 Listener, Client = listener_client[self._serializer]
492 conn = Client(self._address, authkey=self._authkey)
493 dispatch(conn, None, 'dummy')
494 self._state.value = State.STARTED
495
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000496 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000497 '''
498 Spawn a server process for this manager object
499 '''
500 assert self._state.value == State.INITIAL
501
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200502 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000503 raise TypeError('initializer must be a callable')
504
Benjamin Petersone711caf2008-06-11 16:44:04 +0000505 # pipe over which we will retrieve address of server
506 reader, writer = connection.Pipe(duplex=False)
507
508 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100509 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510 target=type(self)._run_server,
511 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000512 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 )
514 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000515 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000516 self._process.start()
517
518 # get address of server
519 writer.close()
520 self._address = reader.recv()
521 reader.close()
522
523 # register a finalizer
524 self._state.value = State.STARTED
525 self.shutdown = util.Finalize(
526 self, type(self)._finalize_manager,
527 args=(self._process, self._address, self._authkey,
528 self._state, self._Client),
529 exitpriority=0
530 )
531
532 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000533 def _run_server(cls, registry, address, authkey, serializer, writer,
534 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000535 '''
536 Create a server, report its address and run it
537 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000538 if initializer is not None:
539 initializer(*initargs)
540
Benjamin Petersone711caf2008-06-11 16:44:04 +0000541 # create server
542 server = cls._Server(registry, address, authkey, serializer)
543
544 # inform parent process of the server's address
545 writer.send(server.address)
546 writer.close()
547
548 # run the manager
549 util.info('manager serving at %r', server.address)
550 server.serve_forever()
551
552 def _create(self, typeid, *args, **kwds):
553 '''
554 Create a new shared object; return the token and exposed tuple
555 '''
556 assert self._state.value == State.STARTED, 'server not yet started'
557 conn = self._Client(self._address, authkey=self._authkey)
558 try:
559 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
560 finally:
561 conn.close()
562 return Token(typeid, self._address, id), exposed
563
564 def join(self, timeout=None):
565 '''
566 Join the manager process (if it has been spawned)
567 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100568 if self._process is not None:
569 self._process.join(timeout)
570 if not self._process.is_alive():
571 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000572
573 def _debug_info(self):
574 '''
575 Return some info about the servers shared objects and connections
576 '''
577 conn = self._Client(self._address, authkey=self._authkey)
578 try:
579 return dispatch(conn, None, 'debug_info')
580 finally:
581 conn.close()
582
583 def _number_of_objects(self):
584 '''
585 Return the number of shared objects
586 '''
587 conn = self._Client(self._address, authkey=self._authkey)
588 try:
589 return dispatch(conn, None, 'number_of_objects')
590 finally:
591 conn.close()
592
593 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100594 if self._state.value == State.INITIAL:
595 self.start()
596 assert self._state.value == State.STARTED
Benjamin Petersone711caf2008-06-11 16:44:04 +0000597 return self
598
599 def __exit__(self, exc_type, exc_val, exc_tb):
600 self.shutdown()
601
602 @staticmethod
603 def _finalize_manager(process, address, authkey, state, _Client):
604 '''
605 Shutdown the manager process; will be registered as a finalizer
606 '''
607 if process.is_alive():
608 util.info('sending shutdown message to manager')
609 try:
610 conn = _Client(address, authkey=authkey)
611 try:
612 dispatch(conn, None, 'shutdown')
613 finally:
614 conn.close()
615 except Exception:
616 pass
617
Richard Oudkerk3049f122012-06-15 20:08:29 +0100618 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000619 if process.is_alive():
620 util.info('manager still alive')
621 if hasattr(process, 'terminate'):
622 util.info('trying to `terminate()` manager process')
623 process.terminate()
624 process.join(timeout=0.1)
625 if process.is_alive():
626 util.info('manager still alive after terminate')
627
628 state.value = State.SHUTDOWN
629 try:
630 del BaseProxy._address_to_local[address]
631 except KeyError:
632 pass
633
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200634 @property
635 def address(self):
636 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000637
638 @classmethod
639 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
640 method_to_typeid=None, create_method=True):
641 '''
642 Register a typeid with the manager type
643 '''
644 if '_registry' not in cls.__dict__:
645 cls._registry = cls._registry.copy()
646
647 if proxytype is None:
648 proxytype = AutoProxy
649
650 exposed = exposed or getattr(proxytype, '_exposed_', None)
651
652 method_to_typeid = method_to_typeid or \
653 getattr(proxytype, '_method_to_typeid_', None)
654
655 if method_to_typeid:
656 for key, value in list(method_to_typeid.items()):
657 assert type(key) is str, '%r is not a string' % key
658 assert type(value) is str, '%r is not a string' % value
659
660 cls._registry[typeid] = (
661 callable, exposed, method_to_typeid, proxytype
662 )
663
664 if create_method:
665 def temp(self, *args, **kwds):
666 util.debug('requesting creation of a shared %r object', typeid)
667 token, exp = self._create(typeid, *args, **kwds)
668 proxy = proxytype(
669 token, self._serializer, manager=self,
670 authkey=self._authkey, exposed=exp
671 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000672 conn = self._Client(token.address, authkey=self._authkey)
673 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 return proxy
675 temp.__name__ = typeid
676 setattr(cls, typeid, temp)
677
678#
679# Subclass of set which get cleared after a fork
680#
681
682class ProcessLocalSet(set):
683 def __init__(self):
684 util.register_after_fork(self, lambda obj: obj.clear())
685 def __reduce__(self):
686 return type(self), ()
687
688#
689# Definition of BaseProxy
690#
691
692class BaseProxy(object):
693 '''
694 A base for proxies of shared objects
695 '''
696 _address_to_local = {}
697 _mutex = util.ForkAwareThreadLock()
698
699 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500700 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100701 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000702 tls_idset = BaseProxy._address_to_local.get(token.address, None)
703 if tls_idset is None:
704 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
705 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000706
707 # self._tls is used to record the connection used by this
708 # thread to communicate with the manager at token.address
709 self._tls = tls_idset[0]
710
711 # self._idset is used to record the identities of all shared
712 # objects for which the current process owns references and
713 # which are in the manager at token.address
714 self._idset = tls_idset[1]
715
716 self._token = token
717 self._id = self._token.id
718 self._manager = manager
719 self._serializer = serializer
720 self._Client = listener_client[serializer][1]
721
Davin Potts86a76682016-09-07 18:48:01 -0500722 # Should be set to True only when a proxy object is being created
723 # on the manager server; primary use case: nested proxy objects.
724 # RebuildProxy detects when a proxy is being created on the manager
725 # and sets this value appropriately.
726 self._owned_by_manager = manager_owned
727
Benjamin Petersone711caf2008-06-11 16:44:04 +0000728 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100729 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000730 elif self._manager is not None:
731 self._authkey = self._manager._authkey
732 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100733 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000734
735 if incref:
736 self._incref()
737
738 util.register_after_fork(self, BaseProxy._after_fork)
739
740 def _connect(self):
741 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100742 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000743 if threading.current_thread().name != 'MainThread':
744 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000745 conn = self._Client(self._token.address, authkey=self._authkey)
746 dispatch(conn, None, 'accept_connection', (name,))
747 self._tls.connection = conn
748
749 def _callmethod(self, methodname, args=(), kwds={}):
750 '''
751 Try to call a method of the referrent and return a copy of the result
752 '''
753 try:
754 conn = self._tls.connection
755 except AttributeError:
756 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000757 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000758 self._connect()
759 conn = self._tls.connection
760
761 conn.send((self._id, methodname, args, kwds))
762 kind, result = conn.recv()
763
764 if kind == '#RETURN':
765 return result
766 elif kind == '#PROXY':
767 exposed, token = result
768 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100769 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000770 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000771 token, self._serializer, manager=self._manager,
772 authkey=self._authkey, exposed=exposed
773 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000774 conn = self._Client(token.address, authkey=self._authkey)
775 dispatch(conn, None, 'decref', (token.id,))
776 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000777 raise convert_to_error(kind, result)
778
779 def _getvalue(self):
780 '''
781 Get a copy of the value of the referent
782 '''
783 return self._callmethod('#GETVALUE')
784
785 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500786 if self._owned_by_manager:
787 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
788 return
789
Benjamin Petersone711caf2008-06-11 16:44:04 +0000790 conn = self._Client(self._token.address, authkey=self._authkey)
791 dispatch(conn, None, 'incref', (self._id,))
792 util.debug('INCREF %r', self._token.id)
793
794 self._idset.add(self._id)
795
796 state = self._manager and self._manager._state
797
798 self._close = util.Finalize(
799 self, BaseProxy._decref,
800 args=(self._token, self._authkey, state,
801 self._tls, self._idset, self._Client),
802 exitpriority=10
803 )
804
805 @staticmethod
806 def _decref(token, authkey, state, tls, idset, _Client):
807 idset.discard(token.id)
808
809 # check whether manager is still alive
810 if state is None or state.value == State.STARTED:
811 # tell manager this process no longer cares about referent
812 try:
813 util.debug('DECREF %r', token.id)
814 conn = _Client(token.address, authkey=authkey)
815 dispatch(conn, None, 'decref', (token.id,))
816 except Exception as e:
817 util.debug('... decref failed %s', e)
818
819 else:
820 util.debug('DECREF %r -- manager already shutdown', token.id)
821
822 # check whether we can close this thread's connection because
823 # the process owns no more references to objects for this manager
824 if not idset and hasattr(tls, 'connection'):
825 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000826 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000827 tls.connection.close()
828 del tls.connection
829
830 def _after_fork(self):
831 self._manager = None
832 try:
833 self._incref()
834 except Exception as e:
835 # the proxy may just be for a manager which has shutdown
836 util.info('incref failed: %s' % e)
837
838 def __reduce__(self):
839 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500840 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000841 kwds['authkey'] = self._authkey
842
843 if getattr(self, '_isauto', False):
844 kwds['exposed'] = self._exposed_
845 return (RebuildProxy,
846 (AutoProxy, self._token, self._serializer, kwds))
847 else:
848 return (RebuildProxy,
849 (type(self), self._token, self._serializer, kwds))
850
851 def __deepcopy__(self, memo):
852 return self._getvalue()
853
854 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300855 return '<%s object, typeid %r at %#x>' % \
856 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000857
858 def __str__(self):
859 '''
860 Return representation of the referent (or a fall-back if that fails)
861 '''
862 try:
863 return self._callmethod('__repr__')
864 except Exception:
865 return repr(self)[:-1] + "; '__str__()' failed>"
866
867#
868# Function used for unpickling
869#
870
871def RebuildProxy(func, token, serializer, kwds):
872 '''
873 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000874 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100875 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500877 util.debug('Rebuild a proxy owned by manager, token=%r', token)
878 kwds['manager_owned'] = True
879 if token.id not in server.id_to_local_proxy_obj:
880 server.id_to_local_proxy_obj[token.id] = \
881 server.id_to_obj[token.id]
882 incref = (
883 kwds.pop('incref', True) and
884 not getattr(process.current_process(), '_inheriting', False)
885 )
886 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000887
888#
889# Functions to create proxies and proxy types
890#
891
892def MakeProxyType(name, exposed, _cache={}):
893 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300894 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000895 '''
896 exposed = tuple(exposed)
897 try:
898 return _cache[(name, exposed)]
899 except KeyError:
900 pass
901
902 dic = {}
903
904 for meth in exposed:
905 exec('''def %s(self, *args, **kwds):
906 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
907
908 ProxyType = type(name, (BaseProxy,), dic)
909 ProxyType._exposed_ = exposed
910 _cache[(name, exposed)] = ProxyType
911 return ProxyType
912
913
914def AutoProxy(token, serializer, manager=None, authkey=None,
915 exposed=None, incref=True):
916 '''
917 Return an auto-proxy for `token`
918 '''
919 _Client = listener_client[serializer][1]
920
921 if exposed is None:
922 conn = _Client(token.address, authkey=authkey)
923 try:
924 exposed = dispatch(conn, None, 'get_methods', (token,))
925 finally:
926 conn.close()
927
928 if authkey is None and manager is not None:
929 authkey = manager._authkey
930 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100931 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000932
933 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
934 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
935 incref=incref)
936 proxy._isauto = True
937 return proxy
938
939#
940# Types/callables which we will register with SyncManager
941#
942
943class Namespace(object):
944 def __init__(self, **kwds):
945 self.__dict__.update(kwds)
946 def __repr__(self):
947 items = list(self.__dict__.items())
948 temp = []
949 for name, value in items:
950 if not name.startswith('_'):
951 temp.append('%s=%r' % (name, value))
952 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300953 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000954
955class Value(object):
956 def __init__(self, typecode, value, lock=True):
957 self._typecode = typecode
958 self._value = value
959 def get(self):
960 return self._value
961 def set(self, value):
962 self._value = value
963 def __repr__(self):
964 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
965 value = property(get, set)
966
967def Array(typecode, sequence, lock=True):
968 return array.array(typecode, sequence)
969
970#
971# Proxy types used by SyncManager
972#
973
974class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000975 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976 def __iter__(self):
977 return self
978 def __next__(self, *args):
979 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000980 def send(self, *args):
981 return self._callmethod('send', args)
982 def throw(self, *args):
983 return self._callmethod('throw', args)
984 def close(self, *args):
985 return self._callmethod('close', args)
986
987
988class AcquirerProxy(BaseProxy):
989 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100990 def acquire(self, blocking=True, timeout=None):
991 args = (blocking,) if timeout is None else (blocking, timeout)
992 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000993 def release(self):
994 return self._callmethod('release')
995 def __enter__(self):
996 return self._callmethod('acquire')
997 def __exit__(self, exc_type, exc_val, exc_tb):
998 return self._callmethod('release')
999
1000
1001class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001002 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001003 def wait(self, timeout=None):
1004 return self._callmethod('wait', (timeout,))
Antoine Pitrou48350412017-07-04 08:59:22 +02001005 def notify(self, n=1):
1006 return self._callmethod('notify', (n,))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001007 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001008 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001009 def wait_for(self, predicate, timeout=None):
1010 result = predicate()
1011 if result:
1012 return result
1013 if timeout is not None:
1014 endtime = _time() + timeout
1015 else:
1016 endtime = None
1017 waittime = None
1018 while not result:
1019 if endtime is not None:
1020 waittime = endtime - _time()
1021 if waittime <= 0:
1022 break
1023 self.wait(waittime)
1024 result = predicate()
1025 return result
1026
Benjamin Petersone711caf2008-06-11 16:44:04 +00001027
1028class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001029 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001030 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001031 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001032 def set(self):
1033 return self._callmethod('set')
1034 def clear(self):
1035 return self._callmethod('clear')
1036 def wait(self, timeout=None):
1037 return self._callmethod('wait', (timeout,))
1038
Richard Oudkerk3730a172012-06-15 18:26:07 +01001039
1040class BarrierProxy(BaseProxy):
1041 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1042 def wait(self, timeout=None):
1043 return self._callmethod('wait', (timeout,))
1044 def abort(self):
1045 return self._callmethod('abort')
1046 def reset(self):
1047 return self._callmethod('reset')
1048 @property
1049 def parties(self):
1050 return self._callmethod('__getattribute__', ('parties',))
1051 @property
1052 def n_waiting(self):
1053 return self._callmethod('__getattribute__', ('n_waiting',))
1054 @property
1055 def broken(self):
1056 return self._callmethod('__getattribute__', ('broken',))
1057
1058
Benjamin Petersone711caf2008-06-11 16:44:04 +00001059class NamespaceProxy(BaseProxy):
1060 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1061 def __getattr__(self, key):
1062 if key[0] == '_':
1063 return object.__getattribute__(self, key)
1064 callmethod = object.__getattribute__(self, '_callmethod')
1065 return callmethod('__getattribute__', (key,))
1066 def __setattr__(self, key, value):
1067 if key[0] == '_':
1068 return object.__setattr__(self, key, value)
1069 callmethod = object.__getattribute__(self, '_callmethod')
1070 return callmethod('__setattr__', (key, value))
1071 def __delattr__(self, key):
1072 if key[0] == '_':
1073 return object.__delattr__(self, key)
1074 callmethod = object.__getattribute__(self, '_callmethod')
1075 return callmethod('__delattr__', (key,))
1076
1077
1078class ValueProxy(BaseProxy):
1079 _exposed_ = ('get', 'set')
1080 def get(self):
1081 return self._callmethod('get')
1082 def set(self, value):
1083 return self._callmethod('set', (value,))
1084 value = property(get, set)
1085
1086
1087BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001088 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1089 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001090 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1091 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001092 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001093class ListProxy(BaseListProxy):
1094 def __iadd__(self, value):
1095 self._callmethod('extend', (value,))
1096 return self
1097 def __imul__(self, value):
1098 self._callmethod('__imul__', (value,))
1099 return self
1100
1101
1102DictProxy = MakeProxyType('DictProxy', (
1103 '__contains__', '__delitem__', '__getitem__', '__len__',
1104 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1105 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1106 ))
1107
1108
1109ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001110 '__len__', '__getitem__', '__setitem__'
1111 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001112
1113
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001114BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001115 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001116 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001117 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001118BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001119 'apply_async': 'AsyncResult',
1120 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001121 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001122 'imap': 'Iterator',
1123 'imap_unordered': 'Iterator'
1124 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001125class PoolProxy(BasePoolProxy):
1126 def __enter__(self):
1127 return self
1128 def __exit__(self, exc_type, exc_val, exc_tb):
1129 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001130
1131#
1132# Definition of SyncManager
1133#
1134
1135class SyncManager(BaseManager):
1136 '''
1137 Subclass of `BaseManager` which supports a number of shared object types.
1138
1139 The types registered are those intended for the synchronization
1140 of threads, plus `dict`, `list` and `Namespace`.
1141
1142 The `multiprocessing.Manager()` function creates started instances of
1143 this class.
1144 '''
1145
1146SyncManager.register('Queue', queue.Queue)
1147SyncManager.register('JoinableQueue', queue.Queue)
1148SyncManager.register('Event', threading.Event, EventProxy)
1149SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1150SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1151SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1152SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1153 AcquirerProxy)
1154SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001155SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001156SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001157SyncManager.register('list', list, ListProxy)
1158SyncManager.register('dict', dict, DictProxy)
1159SyncManager.register('Value', Value, ValueProxy)
1160SyncManager.register('Array', Array, ArrayProxy)
1161SyncManager.register('Namespace', Namespace, NamespaceProxy)
1162
1163# types returned by methods of PoolProxy
1164SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1165SyncManager.register('AsyncResult', create_method=False)