blob: 8e8d28f4b7cdef4fa94fc5986feebde1ff814e02 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
Miss Islington (bot)4bd5fce2018-07-06 05:11:21 -070021import time
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010023from traceback import format_exc
24
25from . import connection
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -050026from .context import reduction, get_spawning_popen, ProcessError
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from . import pool
28from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010029from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010030from . import get_context
Benjamin Petersone711caf2008-06-11 16:44:04 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032#
Benjamin Petersone711caf2008-06-11 16:44:04 +000033# Register some things for pickling
34#
35
36def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000037 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010038reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
40view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000041if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000042 def rebuild_as_list(obj):
43 return list, (list(obj),)
44 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010045 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
47#
48# Type for identifying shared objects
49#
50
51class Token(object):
52 '''
53 Type to uniquely indentify a shared object
54 '''
55 __slots__ = ('typeid', 'address', 'id')
56
57 def __init__(self, typeid, address, id):
58 (self.typeid, self.address, self.id) = (typeid, address, id)
59
60 def __getstate__(self):
61 return (self.typeid, self.address, self.id)
62
63 def __setstate__(self, state):
64 (self.typeid, self.address, self.id) = state
65
66 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030067 return '%s(typeid=%r, address=%r, id=%r)' % \
68 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000069
70#
71# Function for communication with a manager's server process
72#
73
74def dispatch(c, id, methodname, args=(), kwds={}):
75 '''
76 Send a message to manager using connection `c` and return response
77 '''
78 c.send((id, methodname, args, kwds))
79 kind, result = c.recv()
80 if kind == '#RETURN':
81 return result
82 raise convert_to_error(kind, result)
83
84def convert_to_error(kind, result):
85 if kind == '#ERROR':
86 return result
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050087 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
88 if not isinstance(result, str):
89 raise TypeError(
90 "Result {0!r} (kind '{1}') type is {2}, not str".format(
91 result, kind, type(result)))
92 if kind == '#UNSERIALIZABLE':
93 return RemoteError('Unserializable message: %s\n' % result)
94 else:
95 return RemoteError(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +000096 else:
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050097 return ValueError('Unrecognized message type {!r}'.format(kind))
Benjamin Petersone711caf2008-06-11 16:44:04 +000098
99class RemoteError(Exception):
100 def __str__(self):
101 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
102
103#
104# Functions for finding the method names of an object
105#
106
107def all_methods(obj):
108 '''
109 Return a list of names of methods of `obj`
110 '''
111 temp = []
112 for name in dir(obj):
113 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200114 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000115 temp.append(name)
116 return temp
117
118def public_methods(obj):
119 '''
120 Return a list of names of methods of `obj` which do not start with '_'
121 '''
122 return [name for name in all_methods(obj) if name[0] != '_']
123
124#
125# Server which is run in a process controlled by a manager
126#
127
128class Server(object):
129 '''
130 Server class which runs in a process controlled by a manager object
131 '''
132 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
133 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
134
135 def __init__(self, registry, address, authkey, serializer):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500136 if not isinstance(authkey, bytes):
137 raise TypeError(
138 "Authkey {0!r} is type {1!s}, not bytes".format(
139 authkey, type(authkey)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000140 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100141 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000142 Listener, Client = listener_client[serializer]
143
144 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100145 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146 self.address = self.listener.address
147
Jesse Noller63b3a972009-01-21 02:15:48 +0000148 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500150 self.id_to_local_proxy_obj = {}
151 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152
153 def serve_forever(self):
154 '''
155 Run the server forever
156 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100157 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100158 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100160 accepter = threading.Thread(target=self.accepter)
161 accepter.daemon = True
162 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100164 while not self.stop_event.is_set():
165 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 except (KeyboardInterrupt, SystemExit):
167 pass
168 finally:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500169 if sys.stdout != sys.__stdout__: # what about stderr?
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100170 util.debug('resetting stdout, stderr')
171 sys.stdout = sys.__stdout__
172 sys.stderr = sys.__stderr__
173 sys.exit(0)
174
175 def accepter(self):
176 while True:
177 try:
178 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200179 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100180 continue
181 t = threading.Thread(target=self.handle_request, args=(c,))
182 t.daemon = True
183 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000184
185 def handle_request(self, c):
186 '''
187 Handle a new connection
188 '''
189 funcname = result = request = None
190 try:
191 connection.deliver_challenge(c, self.authkey)
192 connection.answer_challenge(c, self.authkey)
193 request = c.recv()
194 ignore, funcname, args, kwds = request
195 assert funcname in self.public, '%r unrecognized' % funcname
196 func = getattr(self, funcname)
197 except Exception:
198 msg = ('#TRACEBACK', format_exc())
199 else:
200 try:
201 result = func(c, *args, **kwds)
202 except Exception:
203 msg = ('#TRACEBACK', format_exc())
204 else:
205 msg = ('#RETURN', result)
206 try:
207 c.send(msg)
208 except Exception as e:
209 try:
210 c.send(('#TRACEBACK', format_exc()))
211 except Exception:
212 pass
213 util.info('Failure to send message: %r', msg)
214 util.info(' ... request was %r', request)
215 util.info(' ... exception was %r', e)
216
217 c.close()
218
219 def serve_client(self, conn):
220 '''
221 Handle requests from the proxies in a particular process/thread
222 '''
223 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000224 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 recv = conn.recv
227 send = conn.send
228 id_to_obj = self.id_to_obj
229
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100230 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
232 try:
233 methodname = obj = None
234 request = recv()
235 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500236 try:
237 obj, exposed, gettypeid = id_to_obj[ident]
238 except KeyError as ke:
239 try:
240 obj, exposed, gettypeid = \
241 self.id_to_local_proxy_obj[ident]
242 except KeyError as second_ke:
243 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
245 if methodname not in exposed:
246 raise AttributeError(
247 'method %r of %r object is not in exposed=%r' %
248 (methodname, type(obj), exposed)
249 )
250
251 function = getattr(obj, methodname)
252
253 try:
254 res = function(*args, **kwds)
255 except Exception as e:
256 msg = ('#ERROR', e)
257 else:
258 typeid = gettypeid and gettypeid.get(methodname, None)
259 if typeid:
260 rident, rexposed = self.create(conn, typeid, res)
261 token = Token(typeid, self.address, rident)
262 msg = ('#PROXY', (rexposed, token))
263 else:
264 msg = ('#RETURN', res)
265
266 except AttributeError:
267 if methodname is None:
268 msg = ('#TRACEBACK', format_exc())
269 else:
270 try:
271 fallback_func = self.fallback_mapping[methodname]
272 result = fallback_func(
273 self, conn, ident, obj, *args, **kwds
274 )
275 msg = ('#RETURN', result)
276 except Exception:
277 msg = ('#TRACEBACK', format_exc())
278
279 except EOFError:
280 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000281 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282 sys.exit(0)
283
284 except Exception:
285 msg = ('#TRACEBACK', format_exc())
286
287 try:
288 try:
289 send(msg)
290 except Exception as e:
Davin Potts37156a72016-09-08 14:40:36 -0500291 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000292 except Exception as e:
293 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000294 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000295 util.info(' ... message was %r', msg)
296 util.info(' ... exception was %r', e)
297 conn.close()
298 sys.exit(1)
299
300 def fallback_getvalue(self, conn, ident, obj):
301 return obj
302
303 def fallback_str(self, conn, ident, obj):
304 return str(obj)
305
306 def fallback_repr(self, conn, ident, obj):
307 return repr(obj)
308
309 fallback_mapping = {
310 '__str__':fallback_str,
311 '__repr__':fallback_repr,
312 '#GETVALUE':fallback_getvalue
313 }
314
315 def dummy(self, c):
316 pass
317
318 def debug_info(self, c):
319 '''
320 Return some info --- useful to spot problems with refcounting
321 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500322 # Perhaps include debug info about 'c'?
Charles-François Natalia924fc72014-05-25 14:12:12 +0100323 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000324 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500325 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000326 keys.sort()
327 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000328 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000329 result.append(' %s: refcount=%s\n %s' %
330 (ident, self.id_to_refcount[ident],
331 str(self.id_to_obj[ident][0])[:75]))
332 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333
334 def number_of_objects(self, c):
335 '''
336 Number of shared objects
337 '''
Davin Potts86a76682016-09-07 18:48:01 -0500338 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
339 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000340
341 def shutdown(self, c):
342 '''
343 Shutdown this process
344 '''
345 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100346 util.debug('manager received shutdown message')
347 c.send(('#RETURN', None))
348 except:
349 import traceback
350 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000351 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100352 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000353
Serhiy Storchakaa37f3562019-04-01 10:59:24 +0300354 def create(*args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000355 '''
356 Create a new shared object and return its id
357 '''
Serhiy Storchakaa37f3562019-04-01 10:59:24 +0300358 if len(args) >= 3:
359 self, c, typeid, *args = args
360 elif not args:
361 raise TypeError("descriptor 'create' of 'Server' object "
362 "needs an argument")
363 else:
364 if 'typeid' not in kwds:
365 raise TypeError('create expected at least 2 positional '
366 'arguments, got %d' % (len(args)-1))
367 typeid = kwds.pop('typeid')
368 if len(args) >= 2:
369 self, c, *args = args
370 else:
371 if 'c' not in kwds:
372 raise TypeError('create expected at least 2 positional '
373 'arguments, got %d' % (len(args)-1))
374 c = kwds.pop('c')
375 self, *args = args
376 args = tuple(args)
377
Charles-François Natalia924fc72014-05-25 14:12:12 +0100378 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000379 callable, exposed, method_to_typeid, proxytype = \
380 self.registry[typeid]
381
382 if callable is None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500383 if kwds or (len(args) != 1):
384 raise ValueError(
385 "Without callable, must have one non-keyword argument")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000386 obj = args[0]
387 else:
388 obj = callable(*args, **kwds)
389
390 if exposed is None:
391 exposed = public_methods(obj)
392 if method_to_typeid is not None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500393 if not isinstance(method_to_typeid, dict):
394 raise TypeError(
395 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
396 method_to_typeid, type(method_to_typeid)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397 exposed = list(exposed) + list(method_to_typeid)
398
399 ident = '%x' % id(obj) # convert to string because xmlrpclib
400 # only has 32 bit signed integers
401 util.debug('%r callable returned object with id %r', typeid, ident)
402
403 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
404 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000405 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500406
407 self.incref(c, ident)
408 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409
410 def get_methods(self, c, token):
411 '''
412 Return the methods of the shared object indicated by token
413 '''
414 return tuple(self.id_to_obj[token.id][1])
415
416 def accept_connection(self, c, name):
417 '''
418 Spawn a new thread to serve this connection
419 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000420 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000421 c.send(('#RETURN', None))
422 self.serve_client(c)
423
424 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100425 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500426 try:
427 self.id_to_refcount[ident] += 1
428 except KeyError as ke:
429 # If no external references exist but an internal (to the
430 # manager) still does and a new external reference is created
431 # from it, restore the manager's tracking of it from the
432 # previously stashed internal ref.
433 if ident in self.id_to_local_proxy_obj:
434 self.id_to_refcount[ident] = 1
435 self.id_to_obj[ident] = \
436 self.id_to_local_proxy_obj[ident]
437 obj, exposed, gettypeid = self.id_to_obj[ident]
438 util.debug('Server re-enabled tracking & INCREF %r', ident)
439 else:
440 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441
442 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500443 if ident not in self.id_to_refcount and \
444 ident in self.id_to_local_proxy_obj:
445 util.debug('Server DECREF skipping %r', ident)
446 return
447
Charles-François Natalia924fc72014-05-25 14:12:12 +0100448 with self.mutex:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500449 if self.id_to_refcount[ident] <= 0:
450 raise AssertionError(
451 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
452 ident, self.id_to_obj[ident],
453 self.id_to_refcount[ident]))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000454 self.id_to_refcount[ident] -= 1
455 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500456 del self.id_to_refcount[ident]
457
458 if ident not in self.id_to_refcount:
459 # Two-step process in case the object turns out to contain other
460 # proxy objects (e.g. a managed list of managed lists).
461 # Otherwise, deleting self.id_to_obj[ident] would trigger the
462 # deleting of the stored value (another managed object) which would
463 # in turn attempt to acquire the mutex that is already held here.
464 self.id_to_obj[ident] = (None, (), None) # thread-safe
465 util.debug('disposing of obj with id %r', ident)
466 with self.mutex:
467 del self.id_to_obj[ident]
468
Benjamin Petersone711caf2008-06-11 16:44:04 +0000469
470#
471# Class to represent state of a manager
472#
473
474class State(object):
475 __slots__ = ['value']
476 INITIAL = 0
477 STARTED = 1
478 SHUTDOWN = 2
479
480#
481# Mapping from serializer name to Listener and Client types
482#
483
484listener_client = {
485 'pickle' : (connection.Listener, connection.Client),
486 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
487 }
488
489#
490# Definition of BaseManager
491#
492
493class BaseManager(object):
494 '''
495 Base class for managers
496 '''
497 _registry = {}
498 _Server = Server
499
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100500 def __init__(self, address=None, authkey=None, serializer='pickle',
501 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000502 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100503 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000504 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100505 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000506 self._state = State()
507 self._state.value = State.INITIAL
508 self._serializer = serializer
509 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100510 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000511
Benjamin Petersone711caf2008-06-11 16:44:04 +0000512 def get_server(self):
513 '''
514 Return server object with serve_forever() method and address attribute
515 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500516 if self._state.value != State.INITIAL:
517 if self._state.value == State.STARTED:
518 raise ProcessError("Already started server")
519 elif self._state.value == State.SHUTDOWN:
520 raise ProcessError("Manager has shut down")
521 else:
522 raise ProcessError(
523 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000524 return Server(self._registry, self._address,
525 self._authkey, self._serializer)
526
527 def connect(self):
528 '''
529 Connect manager object to the server process
530 '''
531 Listener, Client = listener_client[self._serializer]
532 conn = Client(self._address, authkey=self._authkey)
533 dispatch(conn, None, 'dummy')
534 self._state.value = State.STARTED
535
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000536 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000537 '''
538 Spawn a server process for this manager object
539 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500540 if self._state.value != State.INITIAL:
541 if self._state.value == State.STARTED:
542 raise ProcessError("Already started server")
543 elif self._state.value == State.SHUTDOWN:
544 raise ProcessError("Manager has shut down")
545 else:
546 raise ProcessError(
547 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000548
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200549 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000550 raise TypeError('initializer must be a callable')
551
Benjamin Petersone711caf2008-06-11 16:44:04 +0000552 # pipe over which we will retrieve address of server
553 reader, writer = connection.Pipe(duplex=False)
554
555 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100556 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000557 target=type(self)._run_server,
558 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000559 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000560 )
561 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000562 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000563 self._process.start()
564
565 # get address of server
566 writer.close()
567 self._address = reader.recv()
568 reader.close()
569
570 # register a finalizer
571 self._state.value = State.STARTED
572 self.shutdown = util.Finalize(
573 self, type(self)._finalize_manager,
574 args=(self._process, self._address, self._authkey,
575 self._state, self._Client),
576 exitpriority=0
577 )
578
579 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000580 def _run_server(cls, registry, address, authkey, serializer, writer,
581 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000582 '''
583 Create a server, report its address and run it
584 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000585 if initializer is not None:
586 initializer(*initargs)
587
Benjamin Petersone711caf2008-06-11 16:44:04 +0000588 # create server
589 server = cls._Server(registry, address, authkey, serializer)
590
591 # inform parent process of the server's address
592 writer.send(server.address)
593 writer.close()
594
595 # run the manager
596 util.info('manager serving at %r', server.address)
597 server.serve_forever()
598
Serhiy Storchakaa37f3562019-04-01 10:59:24 +0300599 def _create(*args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000600 '''
601 Create a new shared object; return the token and exposed tuple
602 '''
Serhiy Storchakaa37f3562019-04-01 10:59:24 +0300603 self, typeid, *args = args
604 args = tuple(args)
605
Benjamin Petersone711caf2008-06-11 16:44:04 +0000606 assert self._state.value == State.STARTED, 'server not yet started'
607 conn = self._Client(self._address, authkey=self._authkey)
608 try:
609 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
610 finally:
611 conn.close()
612 return Token(typeid, self._address, id), exposed
613
614 def join(self, timeout=None):
615 '''
616 Join the manager process (if it has been spawned)
617 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100618 if self._process is not None:
619 self._process.join(timeout)
620 if not self._process.is_alive():
621 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000622
623 def _debug_info(self):
624 '''
625 Return some info about the servers shared objects and connections
626 '''
627 conn = self._Client(self._address, authkey=self._authkey)
628 try:
629 return dispatch(conn, None, 'debug_info')
630 finally:
631 conn.close()
632
633 def _number_of_objects(self):
634 '''
635 Return the number of shared objects
636 '''
637 conn = self._Client(self._address, authkey=self._authkey)
638 try:
639 return dispatch(conn, None, 'number_of_objects')
640 finally:
641 conn.close()
642
643 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100644 if self._state.value == State.INITIAL:
645 self.start()
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500646 if self._state.value != State.STARTED:
647 if self._state.value == State.INITIAL:
648 raise ProcessError("Unable to start server")
649 elif self._state.value == State.SHUTDOWN:
650 raise ProcessError("Manager has shut down")
651 else:
652 raise ProcessError(
653 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000654 return self
655
656 def __exit__(self, exc_type, exc_val, exc_tb):
657 self.shutdown()
658
659 @staticmethod
660 def _finalize_manager(process, address, authkey, state, _Client):
661 '''
662 Shutdown the manager process; will be registered as a finalizer
663 '''
664 if process.is_alive():
665 util.info('sending shutdown message to manager')
666 try:
667 conn = _Client(address, authkey=authkey)
668 try:
669 dispatch(conn, None, 'shutdown')
670 finally:
671 conn.close()
672 except Exception:
673 pass
674
Richard Oudkerk3049f122012-06-15 20:08:29 +0100675 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000676 if process.is_alive():
677 util.info('manager still alive')
678 if hasattr(process, 'terminate'):
679 util.info('trying to `terminate()` manager process')
680 process.terminate()
681 process.join(timeout=0.1)
682 if process.is_alive():
683 util.info('manager still alive after terminate')
684
685 state.value = State.SHUTDOWN
686 try:
687 del BaseProxy._address_to_local[address]
688 except KeyError:
689 pass
690
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200691 @property
692 def address(self):
693 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000694
695 @classmethod
696 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
697 method_to_typeid=None, create_method=True):
698 '''
699 Register a typeid with the manager type
700 '''
701 if '_registry' not in cls.__dict__:
702 cls._registry = cls._registry.copy()
703
704 if proxytype is None:
705 proxytype = AutoProxy
706
707 exposed = exposed or getattr(proxytype, '_exposed_', None)
708
709 method_to_typeid = method_to_typeid or \
710 getattr(proxytype, '_method_to_typeid_', None)
711
712 if method_to_typeid:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500713 for key, value in list(method_to_typeid.items()): # isinstance?
Benjamin Petersone711caf2008-06-11 16:44:04 +0000714 assert type(key) is str, '%r is not a string' % key
715 assert type(value) is str, '%r is not a string' % value
716
717 cls._registry[typeid] = (
718 callable, exposed, method_to_typeid, proxytype
719 )
720
721 if create_method:
722 def temp(self, *args, **kwds):
723 util.debug('requesting creation of a shared %r object', typeid)
724 token, exp = self._create(typeid, *args, **kwds)
725 proxy = proxytype(
726 token, self._serializer, manager=self,
727 authkey=self._authkey, exposed=exp
728 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000729 conn = self._Client(token.address, authkey=self._authkey)
730 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000731 return proxy
732 temp.__name__ = typeid
733 setattr(cls, typeid, temp)
734
735#
736# Subclass of set which get cleared after a fork
737#
738
739class ProcessLocalSet(set):
740 def __init__(self):
741 util.register_after_fork(self, lambda obj: obj.clear())
742 def __reduce__(self):
743 return type(self), ()
744
745#
746# Definition of BaseProxy
747#
748
749class BaseProxy(object):
750 '''
751 A base for proxies of shared objects
752 '''
753 _address_to_local = {}
754 _mutex = util.ForkAwareThreadLock()
755
756 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500757 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100758 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000759 tls_idset = BaseProxy._address_to_local.get(token.address, None)
760 if tls_idset is None:
761 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
762 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000763
764 # self._tls is used to record the connection used by this
765 # thread to communicate with the manager at token.address
766 self._tls = tls_idset[0]
767
768 # self._idset is used to record the identities of all shared
769 # objects for which the current process owns references and
770 # which are in the manager at token.address
771 self._idset = tls_idset[1]
772
773 self._token = token
774 self._id = self._token.id
775 self._manager = manager
776 self._serializer = serializer
777 self._Client = listener_client[serializer][1]
778
Davin Potts86a76682016-09-07 18:48:01 -0500779 # Should be set to True only when a proxy object is being created
780 # on the manager server; primary use case: nested proxy objects.
781 # RebuildProxy detects when a proxy is being created on the manager
782 # and sets this value appropriately.
783 self._owned_by_manager = manager_owned
784
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100786 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000787 elif self._manager is not None:
788 self._authkey = self._manager._authkey
789 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100790 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000791
792 if incref:
793 self._incref()
794
795 util.register_after_fork(self, BaseProxy._after_fork)
796
797 def _connect(self):
798 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100799 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000800 if threading.current_thread().name != 'MainThread':
801 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802 conn = self._Client(self._token.address, authkey=self._authkey)
803 dispatch(conn, None, 'accept_connection', (name,))
804 self._tls.connection = conn
805
806 def _callmethod(self, methodname, args=(), kwds={}):
807 '''
808 Try to call a method of the referrent and return a copy of the result
809 '''
810 try:
811 conn = self._tls.connection
812 except AttributeError:
813 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000814 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000815 self._connect()
816 conn = self._tls.connection
817
818 conn.send((self._id, methodname, args, kwds))
819 kind, result = conn.recv()
820
821 if kind == '#RETURN':
822 return result
823 elif kind == '#PROXY':
824 exposed, token = result
825 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100826 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000827 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000828 token, self._serializer, manager=self._manager,
829 authkey=self._authkey, exposed=exposed
830 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000831 conn = self._Client(token.address, authkey=self._authkey)
832 dispatch(conn, None, 'decref', (token.id,))
833 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834 raise convert_to_error(kind, result)
835
836 def _getvalue(self):
837 '''
838 Get a copy of the value of the referent
839 '''
840 return self._callmethod('#GETVALUE')
841
842 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500843 if self._owned_by_manager:
844 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
845 return
846
Benjamin Petersone711caf2008-06-11 16:44:04 +0000847 conn = self._Client(self._token.address, authkey=self._authkey)
848 dispatch(conn, None, 'incref', (self._id,))
849 util.debug('INCREF %r', self._token.id)
850
851 self._idset.add(self._id)
852
853 state = self._manager and self._manager._state
854
855 self._close = util.Finalize(
856 self, BaseProxy._decref,
857 args=(self._token, self._authkey, state,
858 self._tls, self._idset, self._Client),
859 exitpriority=10
860 )
861
862 @staticmethod
863 def _decref(token, authkey, state, tls, idset, _Client):
864 idset.discard(token.id)
865
866 # check whether manager is still alive
867 if state is None or state.value == State.STARTED:
868 # tell manager this process no longer cares about referent
869 try:
870 util.debug('DECREF %r', token.id)
871 conn = _Client(token.address, authkey=authkey)
872 dispatch(conn, None, 'decref', (token.id,))
873 except Exception as e:
874 util.debug('... decref failed %s', e)
875
876 else:
877 util.debug('DECREF %r -- manager already shutdown', token.id)
878
879 # check whether we can close this thread's connection because
880 # the process owns no more references to objects for this manager
881 if not idset and hasattr(tls, 'connection'):
882 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000883 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000884 tls.connection.close()
885 del tls.connection
886
887 def _after_fork(self):
888 self._manager = None
889 try:
890 self._incref()
891 except Exception as e:
892 # the proxy may just be for a manager which has shutdown
893 util.info('incref failed: %s' % e)
894
895 def __reduce__(self):
896 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500897 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898 kwds['authkey'] = self._authkey
899
900 if getattr(self, '_isauto', False):
901 kwds['exposed'] = self._exposed_
902 return (RebuildProxy,
903 (AutoProxy, self._token, self._serializer, kwds))
904 else:
905 return (RebuildProxy,
906 (type(self), self._token, self._serializer, kwds))
907
908 def __deepcopy__(self, memo):
909 return self._getvalue()
910
911 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300912 return '<%s object, typeid %r at %#x>' % \
913 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000914
915 def __str__(self):
916 '''
917 Return representation of the referent (or a fall-back if that fails)
918 '''
919 try:
920 return self._callmethod('__repr__')
921 except Exception:
922 return repr(self)[:-1] + "; '__str__()' failed>"
923
924#
925# Function used for unpickling
926#
927
928def RebuildProxy(func, token, serializer, kwds):
929 '''
930 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000931 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100932 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000933 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500934 util.debug('Rebuild a proxy owned by manager, token=%r', token)
935 kwds['manager_owned'] = True
936 if token.id not in server.id_to_local_proxy_obj:
937 server.id_to_local_proxy_obj[token.id] = \
938 server.id_to_obj[token.id]
939 incref = (
940 kwds.pop('incref', True) and
941 not getattr(process.current_process(), '_inheriting', False)
942 )
943 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000944
945#
946# Functions to create proxies and proxy types
947#
948
949def MakeProxyType(name, exposed, _cache={}):
950 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300951 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000952 '''
953 exposed = tuple(exposed)
954 try:
955 return _cache[(name, exposed)]
956 except KeyError:
957 pass
958
959 dic = {}
960
961 for meth in exposed:
962 exec('''def %s(self, *args, **kwds):
963 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
964
965 ProxyType = type(name, (BaseProxy,), dic)
966 ProxyType._exposed_ = exposed
967 _cache[(name, exposed)] = ProxyType
968 return ProxyType
969
970
971def AutoProxy(token, serializer, manager=None, authkey=None,
972 exposed=None, incref=True):
973 '''
974 Return an auto-proxy for `token`
975 '''
976 _Client = listener_client[serializer][1]
977
978 if exposed is None:
979 conn = _Client(token.address, authkey=authkey)
980 try:
981 exposed = dispatch(conn, None, 'get_methods', (token,))
982 finally:
983 conn.close()
984
985 if authkey is None and manager is not None:
986 authkey = manager._authkey
987 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100988 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000989
990 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
991 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
992 incref=incref)
993 proxy._isauto = True
994 return proxy
995
996#
997# Types/callables which we will register with SyncManager
998#
999
1000class Namespace(object):
1001 def __init__(self, **kwds):
1002 self.__dict__.update(kwds)
1003 def __repr__(self):
1004 items = list(self.__dict__.items())
1005 temp = []
1006 for name, value in items:
1007 if not name.startswith('_'):
1008 temp.append('%s=%r' % (name, value))
1009 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +03001010 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001011
1012class Value(object):
1013 def __init__(self, typecode, value, lock=True):
1014 self._typecode = typecode
1015 self._value = value
1016 def get(self):
1017 return self._value
1018 def set(self, value):
1019 self._value = value
1020 def __repr__(self):
1021 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1022 value = property(get, set)
1023
1024def Array(typecode, sequence, lock=True):
1025 return array.array(typecode, sequence)
1026
1027#
1028# Proxy types used by SyncManager
1029#
1030
1031class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +00001032 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033 def __iter__(self):
1034 return self
1035 def __next__(self, *args):
1036 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001037 def send(self, *args):
1038 return self._callmethod('send', args)
1039 def throw(self, *args):
1040 return self._callmethod('throw', args)
1041 def close(self, *args):
1042 return self._callmethod('close', args)
1043
1044
1045class AcquirerProxy(BaseProxy):
1046 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +01001047 def acquire(self, blocking=True, timeout=None):
1048 args = (blocking,) if timeout is None else (blocking, timeout)
1049 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050 def release(self):
1051 return self._callmethod('release')
1052 def __enter__(self):
1053 return self._callmethod('acquire')
1054 def __exit__(self, exc_type, exc_val, exc_tb):
1055 return self._callmethod('release')
1056
1057
1058class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001059 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001060 def wait(self, timeout=None):
1061 return self._callmethod('wait', (timeout,))
Antoine Pitrou48350412017-07-04 08:59:22 +02001062 def notify(self, n=1):
1063 return self._callmethod('notify', (n,))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001064 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001065 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001066 def wait_for(self, predicate, timeout=None):
1067 result = predicate()
1068 if result:
1069 return result
1070 if timeout is not None:
Miss Islington (bot)4bd5fce2018-07-06 05:11:21 -07001071 endtime = time.monotonic() + timeout
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001072 else:
1073 endtime = None
1074 waittime = None
1075 while not result:
1076 if endtime is not None:
Miss Islington (bot)4bd5fce2018-07-06 05:11:21 -07001077 waittime = endtime - time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001078 if waittime <= 0:
1079 break
1080 self.wait(waittime)
1081 result = predicate()
1082 return result
1083
Benjamin Petersone711caf2008-06-11 16:44:04 +00001084
1085class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001086 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001087 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001088 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001089 def set(self):
1090 return self._callmethod('set')
1091 def clear(self):
1092 return self._callmethod('clear')
1093 def wait(self, timeout=None):
1094 return self._callmethod('wait', (timeout,))
1095
Richard Oudkerk3730a172012-06-15 18:26:07 +01001096
1097class BarrierProxy(BaseProxy):
1098 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1099 def wait(self, timeout=None):
1100 return self._callmethod('wait', (timeout,))
1101 def abort(self):
1102 return self._callmethod('abort')
1103 def reset(self):
1104 return self._callmethod('reset')
1105 @property
1106 def parties(self):
1107 return self._callmethod('__getattribute__', ('parties',))
1108 @property
1109 def n_waiting(self):
1110 return self._callmethod('__getattribute__', ('n_waiting',))
1111 @property
1112 def broken(self):
1113 return self._callmethod('__getattribute__', ('broken',))
1114
1115
Benjamin Petersone711caf2008-06-11 16:44:04 +00001116class NamespaceProxy(BaseProxy):
1117 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1118 def __getattr__(self, key):
1119 if key[0] == '_':
1120 return object.__getattribute__(self, key)
1121 callmethod = object.__getattribute__(self, '_callmethod')
1122 return callmethod('__getattribute__', (key,))
1123 def __setattr__(self, key, value):
1124 if key[0] == '_':
1125 return object.__setattr__(self, key, value)
1126 callmethod = object.__getattribute__(self, '_callmethod')
1127 return callmethod('__setattr__', (key, value))
1128 def __delattr__(self, key):
1129 if key[0] == '_':
1130 return object.__delattr__(self, key)
1131 callmethod = object.__getattribute__(self, '_callmethod')
1132 return callmethod('__delattr__', (key,))
1133
1134
1135class ValueProxy(BaseProxy):
1136 _exposed_ = ('get', 'set')
1137 def get(self):
1138 return self._callmethod('get')
1139 def set(self, value):
1140 return self._callmethod('set', (value,))
1141 value = property(get, set)
1142
1143
1144BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001145 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1146 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001147 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1148 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001149 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001150class ListProxy(BaseListProxy):
1151 def __iadd__(self, value):
1152 self._callmethod('extend', (value,))
1153 return self
1154 def __imul__(self, value):
1155 self._callmethod('__imul__', (value,))
1156 return self
1157
1158
1159DictProxy = MakeProxyType('DictProxy', (
Miss Islington (bot)1d307882018-09-17 05:10:56 -07001160 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
Miss Islington (bot)58f05ce2019-02-11 17:09:22 -08001161 '__setitem__', 'clear', 'copy', 'get', 'items',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001162 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1163 ))
Miss Islington (bot)1d307882018-09-17 05:10:56 -07001164DictProxy._method_to_typeid_ = {
1165 '__iter__': 'Iterator',
1166 }
Benjamin Petersone711caf2008-06-11 16:44:04 +00001167
1168
1169ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001170 '__len__', '__getitem__', '__setitem__'
1171 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001172
1173
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001174BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001175 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001176 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001177 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001178BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001179 'apply_async': 'AsyncResult',
1180 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001181 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001182 'imap': 'Iterator',
1183 'imap_unordered': 'Iterator'
1184 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001185class PoolProxy(BasePoolProxy):
1186 def __enter__(self):
1187 return self
1188 def __exit__(self, exc_type, exc_val, exc_tb):
1189 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001190
1191#
1192# Definition of SyncManager
1193#
1194
1195class SyncManager(BaseManager):
1196 '''
1197 Subclass of `BaseManager` which supports a number of shared object types.
1198
1199 The types registered are those intended for the synchronization
1200 of threads, plus `dict`, `list` and `Namespace`.
1201
1202 The `multiprocessing.Manager()` function creates started instances of
1203 this class.
1204 '''
1205
1206SyncManager.register('Queue', queue.Queue)
1207SyncManager.register('JoinableQueue', queue.Queue)
1208SyncManager.register('Event', threading.Event, EventProxy)
1209SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1210SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1211SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1212SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1213 AcquirerProxy)
1214SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001215SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001216SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001217SyncManager.register('list', list, ListProxy)
1218SyncManager.register('dict', dict, DictProxy)
1219SyncManager.register('Value', Value, ValueProxy)
1220SyncManager.register('Array', Array, ArrayProxy)
1221SyncManager.register('Namespace', Namespace, NamespaceProxy)
1222
1223# types returned by methods of PoolProxy
1224SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1225SyncManager.register('AsyncResult', create_method=False)