blob: 04df26bac661871d7603b8046c842726991376a3 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
Charles-François Natalic8ce7152012-04-17 18:45:57 +020022from time import time as _time
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010023from traceback import format_exc
24
25from . import connection
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -050026from .context import reduction, get_spawning_popen, ProcessError
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from . import pool
28from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010029from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010030from . import get_context
Benjamin Petersone711caf2008-06-11 16:44:04 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032#
Benjamin Petersone711caf2008-06-11 16:44:04 +000033# Register some things for pickling
34#
35
36def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000037 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010038reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
40view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000041if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000042 def rebuild_as_list(obj):
43 return list, (list(obj),)
44 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010045 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
47#
48# Type for identifying shared objects
49#
50
51class Token(object):
52 '''
53 Type to uniquely indentify a shared object
54 '''
55 __slots__ = ('typeid', 'address', 'id')
56
57 def __init__(self, typeid, address, id):
58 (self.typeid, self.address, self.id) = (typeid, address, id)
59
60 def __getstate__(self):
61 return (self.typeid, self.address, self.id)
62
63 def __setstate__(self, state):
64 (self.typeid, self.address, self.id) = state
65
66 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030067 return '%s(typeid=%r, address=%r, id=%r)' % \
68 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000069
70#
71# Function for communication with a manager's server process
72#
73
74def dispatch(c, id, methodname, args=(), kwds={}):
75 '''
76 Send a message to manager using connection `c` and return response
77 '''
78 c.send((id, methodname, args, kwds))
79 kind, result = c.recv()
80 if kind == '#RETURN':
81 return result
82 raise convert_to_error(kind, result)
83
84def convert_to_error(kind, result):
85 if kind == '#ERROR':
86 return result
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050087 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
88 if not isinstance(result, str):
89 raise TypeError(
90 "Result {0!r} (kind '{1}') type is {2}, not str".format(
91 result, kind, type(result)))
92 if kind == '#UNSERIALIZABLE':
93 return RemoteError('Unserializable message: %s\n' % result)
94 else:
95 return RemoteError(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +000096 else:
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050097 return ValueError('Unrecognized message type {!r}'.format(kind))
Benjamin Petersone711caf2008-06-11 16:44:04 +000098
99class RemoteError(Exception):
100 def __str__(self):
101 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
102
103#
104# Functions for finding the method names of an object
105#
106
107def all_methods(obj):
108 '''
109 Return a list of names of methods of `obj`
110 '''
111 temp = []
112 for name in dir(obj):
113 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200114 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000115 temp.append(name)
116 return temp
117
118def public_methods(obj):
119 '''
120 Return a list of names of methods of `obj` which do not start with '_'
121 '''
122 return [name for name in all_methods(obj) if name[0] != '_']
123
124#
125# Server which is run in a process controlled by a manager
126#
127
128class Server(object):
129 '''
130 Server class which runs in a process controlled by a manager object
131 '''
132 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
133 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
134
135 def __init__(self, registry, address, authkey, serializer):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500136 if not isinstance(authkey, bytes):
137 raise TypeError(
138 "Authkey {0!r} is type {1!s}, not bytes".format(
139 authkey, type(authkey)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000140 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100141 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000142 Listener, Client = listener_client[serializer]
143
144 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100145 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146 self.address = self.listener.address
147
Jesse Noller63b3a972009-01-21 02:15:48 +0000148 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500150 self.id_to_local_proxy_obj = {}
151 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152
153 def serve_forever(self):
154 '''
155 Run the server forever
156 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100157 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100158 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100160 accepter = threading.Thread(target=self.accepter)
161 accepter.daemon = True
162 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100164 while not self.stop_event.is_set():
165 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 except (KeyboardInterrupt, SystemExit):
167 pass
168 finally:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500169 if sys.stdout != sys.__stdout__: # what about stderr?
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100170 util.debug('resetting stdout, stderr')
171 sys.stdout = sys.__stdout__
172 sys.stderr = sys.__stderr__
173 sys.exit(0)
174
175 def accepter(self):
176 while True:
177 try:
178 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200179 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100180 continue
181 t = threading.Thread(target=self.handle_request, args=(c,))
182 t.daemon = True
183 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000184
185 def handle_request(self, c):
186 '''
187 Handle a new connection
188 '''
189 funcname = result = request = None
190 try:
191 connection.deliver_challenge(c, self.authkey)
192 connection.answer_challenge(c, self.authkey)
193 request = c.recv()
194 ignore, funcname, args, kwds = request
195 assert funcname in self.public, '%r unrecognized' % funcname
196 func = getattr(self, funcname)
197 except Exception:
198 msg = ('#TRACEBACK', format_exc())
199 else:
200 try:
201 result = func(c, *args, **kwds)
202 except Exception:
203 msg = ('#TRACEBACK', format_exc())
204 else:
205 msg = ('#RETURN', result)
206 try:
207 c.send(msg)
208 except Exception as e:
209 try:
210 c.send(('#TRACEBACK', format_exc()))
211 except Exception:
212 pass
213 util.info('Failure to send message: %r', msg)
214 util.info(' ... request was %r', request)
215 util.info(' ... exception was %r', e)
216
217 c.close()
218
219 def serve_client(self, conn):
220 '''
221 Handle requests from the proxies in a particular process/thread
222 '''
223 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000224 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 recv = conn.recv
227 send = conn.send
228 id_to_obj = self.id_to_obj
229
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100230 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
232 try:
233 methodname = obj = None
234 request = recv()
235 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500236 try:
237 obj, exposed, gettypeid = id_to_obj[ident]
238 except KeyError as ke:
239 try:
240 obj, exposed, gettypeid = \
241 self.id_to_local_proxy_obj[ident]
242 except KeyError as second_ke:
243 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
245 if methodname not in exposed:
246 raise AttributeError(
247 'method %r of %r object is not in exposed=%r' %
248 (methodname, type(obj), exposed)
249 )
250
251 function = getattr(obj, methodname)
252
253 try:
254 res = function(*args, **kwds)
255 except Exception as e:
256 msg = ('#ERROR', e)
257 else:
258 typeid = gettypeid and gettypeid.get(methodname, None)
259 if typeid:
260 rident, rexposed = self.create(conn, typeid, res)
261 token = Token(typeid, self.address, rident)
262 msg = ('#PROXY', (rexposed, token))
263 else:
264 msg = ('#RETURN', res)
265
266 except AttributeError:
267 if methodname is None:
268 msg = ('#TRACEBACK', format_exc())
269 else:
270 try:
271 fallback_func = self.fallback_mapping[methodname]
272 result = fallback_func(
273 self, conn, ident, obj, *args, **kwds
274 )
275 msg = ('#RETURN', result)
276 except Exception:
277 msg = ('#TRACEBACK', format_exc())
278
279 except EOFError:
280 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000281 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282 sys.exit(0)
283
284 except Exception:
285 msg = ('#TRACEBACK', format_exc())
286
287 try:
288 try:
289 send(msg)
290 except Exception as e:
Davin Potts37156a72016-09-08 14:40:36 -0500291 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000292 except Exception as e:
293 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000294 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000295 util.info(' ... message was %r', msg)
296 util.info(' ... exception was %r', e)
297 conn.close()
298 sys.exit(1)
299
300 def fallback_getvalue(self, conn, ident, obj):
301 return obj
302
303 def fallback_str(self, conn, ident, obj):
304 return str(obj)
305
306 def fallback_repr(self, conn, ident, obj):
307 return repr(obj)
308
309 fallback_mapping = {
310 '__str__':fallback_str,
311 '__repr__':fallback_repr,
312 '#GETVALUE':fallback_getvalue
313 }
314
315 def dummy(self, c):
316 pass
317
318 def debug_info(self, c):
319 '''
320 Return some info --- useful to spot problems with refcounting
321 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500322 # Perhaps include debug info about 'c'?
Charles-François Natalia924fc72014-05-25 14:12:12 +0100323 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000324 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500325 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000326 keys.sort()
327 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000328 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000329 result.append(' %s: refcount=%s\n %s' %
330 (ident, self.id_to_refcount[ident],
331 str(self.id_to_obj[ident][0])[:75]))
332 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333
334 def number_of_objects(self, c):
335 '''
336 Number of shared objects
337 '''
Davin Potts86a76682016-09-07 18:48:01 -0500338 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
339 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000340
341 def shutdown(self, c):
342 '''
343 Shutdown this process
344 '''
345 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100346 util.debug('manager received shutdown message')
347 c.send(('#RETURN', None))
348 except:
349 import traceback
350 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000351 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100352 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000353
354 def create(self, c, typeid, *args, **kwds):
355 '''
356 Create a new shared object and return its id
357 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100358 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000359 callable, exposed, method_to_typeid, proxytype = \
360 self.registry[typeid]
361
362 if callable is None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500363 if kwds or (len(args) != 1):
364 raise ValueError(
365 "Without callable, must have one non-keyword argument")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000366 obj = args[0]
367 else:
368 obj = callable(*args, **kwds)
369
370 if exposed is None:
371 exposed = public_methods(obj)
372 if method_to_typeid is not None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500373 if not isinstance(method_to_typeid, dict):
374 raise TypeError(
375 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
376 method_to_typeid, type(method_to_typeid)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000377 exposed = list(exposed) + list(method_to_typeid)
378
379 ident = '%x' % id(obj) # convert to string because xmlrpclib
380 # only has 32 bit signed integers
381 util.debug('%r callable returned object with id %r', typeid, ident)
382
383 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
384 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000385 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500386
387 self.incref(c, ident)
388 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000389
390 def get_methods(self, c, token):
391 '''
392 Return the methods of the shared object indicated by token
393 '''
394 return tuple(self.id_to_obj[token.id][1])
395
396 def accept_connection(self, c, name):
397 '''
398 Spawn a new thread to serve this connection
399 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000400 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000401 c.send(('#RETURN', None))
402 self.serve_client(c)
403
404 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100405 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500406 try:
407 self.id_to_refcount[ident] += 1
408 except KeyError as ke:
409 # If no external references exist but an internal (to the
410 # manager) still does and a new external reference is created
411 # from it, restore the manager's tracking of it from the
412 # previously stashed internal ref.
413 if ident in self.id_to_local_proxy_obj:
414 self.id_to_refcount[ident] = 1
415 self.id_to_obj[ident] = \
416 self.id_to_local_proxy_obj[ident]
417 obj, exposed, gettypeid = self.id_to_obj[ident]
418 util.debug('Server re-enabled tracking & INCREF %r', ident)
419 else:
420 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000421
422 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500423 if ident not in self.id_to_refcount and \
424 ident in self.id_to_local_proxy_obj:
425 util.debug('Server DECREF skipping %r', ident)
426 return
427
Charles-François Natalia924fc72014-05-25 14:12:12 +0100428 with self.mutex:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500429 if self.id_to_refcount[ident] <= 0:
430 raise AssertionError(
431 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
432 ident, self.id_to_obj[ident],
433 self.id_to_refcount[ident]))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000434 self.id_to_refcount[ident] -= 1
435 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500436 del self.id_to_refcount[ident]
437
438 if ident not in self.id_to_refcount:
439 # Two-step process in case the object turns out to contain other
440 # proxy objects (e.g. a managed list of managed lists).
441 # Otherwise, deleting self.id_to_obj[ident] would trigger the
442 # deleting of the stored value (another managed object) which would
443 # in turn attempt to acquire the mutex that is already held here.
444 self.id_to_obj[ident] = (None, (), None) # thread-safe
445 util.debug('disposing of obj with id %r', ident)
446 with self.mutex:
447 del self.id_to_obj[ident]
448
Benjamin Petersone711caf2008-06-11 16:44:04 +0000449
450#
451# Class to represent state of a manager
452#
453
454class State(object):
455 __slots__ = ['value']
456 INITIAL = 0
457 STARTED = 1
458 SHUTDOWN = 2
459
460#
461# Mapping from serializer name to Listener and Client types
462#
463
464listener_client = {
465 'pickle' : (connection.Listener, connection.Client),
466 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
467 }
468
469#
470# Definition of BaseManager
471#
472
473class BaseManager(object):
474 '''
475 Base class for managers
476 '''
477 _registry = {}
478 _Server = Server
479
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100480 def __init__(self, address=None, authkey=None, serializer='pickle',
481 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000482 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100483 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000484 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100485 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000486 self._state = State()
487 self._state.value = State.INITIAL
488 self._serializer = serializer
489 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100490 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000491
Benjamin Petersone711caf2008-06-11 16:44:04 +0000492 def get_server(self):
493 '''
494 Return server object with serve_forever() method and address attribute
495 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500496 if self._state.value != State.INITIAL:
497 if self._state.value == State.STARTED:
498 raise ProcessError("Already started server")
499 elif self._state.value == State.SHUTDOWN:
500 raise ProcessError("Manager has shut down")
501 else:
502 raise ProcessError(
503 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000504 return Server(self._registry, self._address,
505 self._authkey, self._serializer)
506
507 def connect(self):
508 '''
509 Connect manager object to the server process
510 '''
511 Listener, Client = listener_client[self._serializer]
512 conn = Client(self._address, authkey=self._authkey)
513 dispatch(conn, None, 'dummy')
514 self._state.value = State.STARTED
515
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000516 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000517 '''
518 Spawn a server process for this manager object
519 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500520 if self._state.value != State.INITIAL:
521 if self._state.value == State.STARTED:
522 raise ProcessError("Already started server")
523 elif self._state.value == State.SHUTDOWN:
524 raise ProcessError("Manager has shut down")
525 else:
526 raise ProcessError(
527 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000528
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200529 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000530 raise TypeError('initializer must be a callable')
531
Benjamin Petersone711caf2008-06-11 16:44:04 +0000532 # pipe over which we will retrieve address of server
533 reader, writer = connection.Pipe(duplex=False)
534
535 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100536 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000537 target=type(self)._run_server,
538 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000539 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000540 )
541 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000542 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000543 self._process.start()
544
545 # get address of server
546 writer.close()
547 self._address = reader.recv()
548 reader.close()
549
550 # register a finalizer
551 self._state.value = State.STARTED
552 self.shutdown = util.Finalize(
553 self, type(self)._finalize_manager,
554 args=(self._process, self._address, self._authkey,
555 self._state, self._Client),
556 exitpriority=0
557 )
558
559 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000560 def _run_server(cls, registry, address, authkey, serializer, writer,
561 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562 '''
563 Create a server, report its address and run it
564 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000565 if initializer is not None:
566 initializer(*initargs)
567
Benjamin Petersone711caf2008-06-11 16:44:04 +0000568 # create server
569 server = cls._Server(registry, address, authkey, serializer)
570
571 # inform parent process of the server's address
572 writer.send(server.address)
573 writer.close()
574
575 # run the manager
576 util.info('manager serving at %r', server.address)
577 server.serve_forever()
578
579 def _create(self, typeid, *args, **kwds):
580 '''
581 Create a new shared object; return the token and exposed tuple
582 '''
583 assert self._state.value == State.STARTED, 'server not yet started'
584 conn = self._Client(self._address, authkey=self._authkey)
585 try:
586 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
587 finally:
588 conn.close()
589 return Token(typeid, self._address, id), exposed
590
591 def join(self, timeout=None):
592 '''
593 Join the manager process (if it has been spawned)
594 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100595 if self._process is not None:
596 self._process.join(timeout)
597 if not self._process.is_alive():
598 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000599
600 def _debug_info(self):
601 '''
602 Return some info about the servers shared objects and connections
603 '''
604 conn = self._Client(self._address, authkey=self._authkey)
605 try:
606 return dispatch(conn, None, 'debug_info')
607 finally:
608 conn.close()
609
610 def _number_of_objects(self):
611 '''
612 Return the number of shared objects
613 '''
614 conn = self._Client(self._address, authkey=self._authkey)
615 try:
616 return dispatch(conn, None, 'number_of_objects')
617 finally:
618 conn.close()
619
620 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100621 if self._state.value == State.INITIAL:
622 self.start()
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500623 if self._state.value != State.STARTED:
624 if self._state.value == State.INITIAL:
625 raise ProcessError("Unable to start server")
626 elif self._state.value == State.SHUTDOWN:
627 raise ProcessError("Manager has shut down")
628 else:
629 raise ProcessError(
630 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000631 return self
632
633 def __exit__(self, exc_type, exc_val, exc_tb):
634 self.shutdown()
635
636 @staticmethod
637 def _finalize_manager(process, address, authkey, state, _Client):
638 '''
639 Shutdown the manager process; will be registered as a finalizer
640 '''
641 if process.is_alive():
642 util.info('sending shutdown message to manager')
643 try:
644 conn = _Client(address, authkey=authkey)
645 try:
646 dispatch(conn, None, 'shutdown')
647 finally:
648 conn.close()
649 except Exception:
650 pass
651
Richard Oudkerk3049f122012-06-15 20:08:29 +0100652 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000653 if process.is_alive():
654 util.info('manager still alive')
655 if hasattr(process, 'terminate'):
656 util.info('trying to `terminate()` manager process')
657 process.terminate()
658 process.join(timeout=0.1)
659 if process.is_alive():
660 util.info('manager still alive after terminate')
661
662 state.value = State.SHUTDOWN
663 try:
664 del BaseProxy._address_to_local[address]
665 except KeyError:
666 pass
667
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200668 @property
669 def address(self):
670 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671
672 @classmethod
673 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
674 method_to_typeid=None, create_method=True):
675 '''
676 Register a typeid with the manager type
677 '''
678 if '_registry' not in cls.__dict__:
679 cls._registry = cls._registry.copy()
680
681 if proxytype is None:
682 proxytype = AutoProxy
683
684 exposed = exposed or getattr(proxytype, '_exposed_', None)
685
686 method_to_typeid = method_to_typeid or \
687 getattr(proxytype, '_method_to_typeid_', None)
688
689 if method_to_typeid:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500690 for key, value in list(method_to_typeid.items()): # isinstance?
Benjamin Petersone711caf2008-06-11 16:44:04 +0000691 assert type(key) is str, '%r is not a string' % key
692 assert type(value) is str, '%r is not a string' % value
693
694 cls._registry[typeid] = (
695 callable, exposed, method_to_typeid, proxytype
696 )
697
698 if create_method:
699 def temp(self, *args, **kwds):
700 util.debug('requesting creation of a shared %r object', typeid)
701 token, exp = self._create(typeid, *args, **kwds)
702 proxy = proxytype(
703 token, self._serializer, manager=self,
704 authkey=self._authkey, exposed=exp
705 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000706 conn = self._Client(token.address, authkey=self._authkey)
707 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000708 return proxy
709 temp.__name__ = typeid
710 setattr(cls, typeid, temp)
711
712#
713# Subclass of set which get cleared after a fork
714#
715
716class ProcessLocalSet(set):
717 def __init__(self):
718 util.register_after_fork(self, lambda obj: obj.clear())
719 def __reduce__(self):
720 return type(self), ()
721
722#
723# Definition of BaseProxy
724#
725
726class BaseProxy(object):
727 '''
728 A base for proxies of shared objects
729 '''
730 _address_to_local = {}
731 _mutex = util.ForkAwareThreadLock()
732
733 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500734 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100735 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000736 tls_idset = BaseProxy._address_to_local.get(token.address, None)
737 if tls_idset is None:
738 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
739 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740
741 # self._tls is used to record the connection used by this
742 # thread to communicate with the manager at token.address
743 self._tls = tls_idset[0]
744
745 # self._idset is used to record the identities of all shared
746 # objects for which the current process owns references and
747 # which are in the manager at token.address
748 self._idset = tls_idset[1]
749
750 self._token = token
751 self._id = self._token.id
752 self._manager = manager
753 self._serializer = serializer
754 self._Client = listener_client[serializer][1]
755
Davin Potts86a76682016-09-07 18:48:01 -0500756 # Should be set to True only when a proxy object is being created
757 # on the manager server; primary use case: nested proxy objects.
758 # RebuildProxy detects when a proxy is being created on the manager
759 # and sets this value appropriately.
760 self._owned_by_manager = manager_owned
761
Benjamin Petersone711caf2008-06-11 16:44:04 +0000762 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100763 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000764 elif self._manager is not None:
765 self._authkey = self._manager._authkey
766 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100767 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000768
769 if incref:
770 self._incref()
771
772 util.register_after_fork(self, BaseProxy._after_fork)
773
774 def _connect(self):
775 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100776 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000777 if threading.current_thread().name != 'MainThread':
778 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 conn = self._Client(self._token.address, authkey=self._authkey)
780 dispatch(conn, None, 'accept_connection', (name,))
781 self._tls.connection = conn
782
783 def _callmethod(self, methodname, args=(), kwds={}):
784 '''
785 Try to call a method of the referrent and return a copy of the result
786 '''
787 try:
788 conn = self._tls.connection
789 except AttributeError:
790 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000791 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000792 self._connect()
793 conn = self._tls.connection
794
795 conn.send((self._id, methodname, args, kwds))
796 kind, result = conn.recv()
797
798 if kind == '#RETURN':
799 return result
800 elif kind == '#PROXY':
801 exposed, token = result
802 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100803 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000804 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000805 token, self._serializer, manager=self._manager,
806 authkey=self._authkey, exposed=exposed
807 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000808 conn = self._Client(token.address, authkey=self._authkey)
809 dispatch(conn, None, 'decref', (token.id,))
810 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000811 raise convert_to_error(kind, result)
812
813 def _getvalue(self):
814 '''
815 Get a copy of the value of the referent
816 '''
817 return self._callmethod('#GETVALUE')
818
819 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500820 if self._owned_by_manager:
821 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
822 return
823
Benjamin Petersone711caf2008-06-11 16:44:04 +0000824 conn = self._Client(self._token.address, authkey=self._authkey)
825 dispatch(conn, None, 'incref', (self._id,))
826 util.debug('INCREF %r', self._token.id)
827
828 self._idset.add(self._id)
829
830 state = self._manager and self._manager._state
831
832 self._close = util.Finalize(
833 self, BaseProxy._decref,
834 args=(self._token, self._authkey, state,
835 self._tls, self._idset, self._Client),
836 exitpriority=10
837 )
838
839 @staticmethod
840 def _decref(token, authkey, state, tls, idset, _Client):
841 idset.discard(token.id)
842
843 # check whether manager is still alive
844 if state is None or state.value == State.STARTED:
845 # tell manager this process no longer cares about referent
846 try:
847 util.debug('DECREF %r', token.id)
848 conn = _Client(token.address, authkey=authkey)
849 dispatch(conn, None, 'decref', (token.id,))
850 except Exception as e:
851 util.debug('... decref failed %s', e)
852
853 else:
854 util.debug('DECREF %r -- manager already shutdown', token.id)
855
856 # check whether we can close this thread's connection because
857 # the process owns no more references to objects for this manager
858 if not idset and hasattr(tls, 'connection'):
859 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000860 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000861 tls.connection.close()
862 del tls.connection
863
864 def _after_fork(self):
865 self._manager = None
866 try:
867 self._incref()
868 except Exception as e:
869 # the proxy may just be for a manager which has shutdown
870 util.info('incref failed: %s' % e)
871
872 def __reduce__(self):
873 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500874 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000875 kwds['authkey'] = self._authkey
876
877 if getattr(self, '_isauto', False):
878 kwds['exposed'] = self._exposed_
879 return (RebuildProxy,
880 (AutoProxy, self._token, self._serializer, kwds))
881 else:
882 return (RebuildProxy,
883 (type(self), self._token, self._serializer, kwds))
884
885 def __deepcopy__(self, memo):
886 return self._getvalue()
887
888 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300889 return '<%s object, typeid %r at %#x>' % \
890 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891
892 def __str__(self):
893 '''
894 Return representation of the referent (or a fall-back if that fails)
895 '''
896 try:
897 return self._callmethod('__repr__')
898 except Exception:
899 return repr(self)[:-1] + "; '__str__()' failed>"
900
901#
902# Function used for unpickling
903#
904
905def RebuildProxy(func, token, serializer, kwds):
906 '''
907 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100909 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000910 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500911 util.debug('Rebuild a proxy owned by manager, token=%r', token)
912 kwds['manager_owned'] = True
913 if token.id not in server.id_to_local_proxy_obj:
914 server.id_to_local_proxy_obj[token.id] = \
915 server.id_to_obj[token.id]
916 incref = (
917 kwds.pop('incref', True) and
918 not getattr(process.current_process(), '_inheriting', False)
919 )
920 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000921
922#
923# Functions to create proxies and proxy types
924#
925
926def MakeProxyType(name, exposed, _cache={}):
927 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300928 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000929 '''
930 exposed = tuple(exposed)
931 try:
932 return _cache[(name, exposed)]
933 except KeyError:
934 pass
935
936 dic = {}
937
938 for meth in exposed:
939 exec('''def %s(self, *args, **kwds):
940 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
941
942 ProxyType = type(name, (BaseProxy,), dic)
943 ProxyType._exposed_ = exposed
944 _cache[(name, exposed)] = ProxyType
945 return ProxyType
946
947
948def AutoProxy(token, serializer, manager=None, authkey=None,
949 exposed=None, incref=True):
950 '''
951 Return an auto-proxy for `token`
952 '''
953 _Client = listener_client[serializer][1]
954
955 if exposed is None:
956 conn = _Client(token.address, authkey=authkey)
957 try:
958 exposed = dispatch(conn, None, 'get_methods', (token,))
959 finally:
960 conn.close()
961
962 if authkey is None and manager is not None:
963 authkey = manager._authkey
964 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100965 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000966
967 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
968 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
969 incref=incref)
970 proxy._isauto = True
971 return proxy
972
973#
974# Types/callables which we will register with SyncManager
975#
976
977class Namespace(object):
978 def __init__(self, **kwds):
979 self.__dict__.update(kwds)
980 def __repr__(self):
981 items = list(self.__dict__.items())
982 temp = []
983 for name, value in items:
984 if not name.startswith('_'):
985 temp.append('%s=%r' % (name, value))
986 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300987 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000988
989class Value(object):
990 def __init__(self, typecode, value, lock=True):
991 self._typecode = typecode
992 self._value = value
993 def get(self):
994 return self._value
995 def set(self, value):
996 self._value = value
997 def __repr__(self):
998 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
999 value = property(get, set)
1000
1001def Array(typecode, sequence, lock=True):
1002 return array.array(typecode, sequence)
1003
1004#
1005# Proxy types used by SyncManager
1006#
1007
1008class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +00001009 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001010 def __iter__(self):
1011 return self
1012 def __next__(self, *args):
1013 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001014 def send(self, *args):
1015 return self._callmethod('send', args)
1016 def throw(self, *args):
1017 return self._callmethod('throw', args)
1018 def close(self, *args):
1019 return self._callmethod('close', args)
1020
1021
1022class AcquirerProxy(BaseProxy):
1023 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +01001024 def acquire(self, blocking=True, timeout=None):
1025 args = (blocking,) if timeout is None else (blocking, timeout)
1026 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001027 def release(self):
1028 return self._callmethod('release')
1029 def __enter__(self):
1030 return self._callmethod('acquire')
1031 def __exit__(self, exc_type, exc_val, exc_tb):
1032 return self._callmethod('release')
1033
1034
1035class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001036 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001037 def wait(self, timeout=None):
1038 return self._callmethod('wait', (timeout,))
Antoine Pitrou48350412017-07-04 08:59:22 +02001039 def notify(self, n=1):
1040 return self._callmethod('notify', (n,))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001041 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001042 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001043 def wait_for(self, predicate, timeout=None):
1044 result = predicate()
1045 if result:
1046 return result
1047 if timeout is not None:
1048 endtime = _time() + timeout
1049 else:
1050 endtime = None
1051 waittime = None
1052 while not result:
1053 if endtime is not None:
1054 waittime = endtime - _time()
1055 if waittime <= 0:
1056 break
1057 self.wait(waittime)
1058 result = predicate()
1059 return result
1060
Benjamin Petersone711caf2008-06-11 16:44:04 +00001061
1062class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001063 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001064 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001065 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001066 def set(self):
1067 return self._callmethod('set')
1068 def clear(self):
1069 return self._callmethod('clear')
1070 def wait(self, timeout=None):
1071 return self._callmethod('wait', (timeout,))
1072
Richard Oudkerk3730a172012-06-15 18:26:07 +01001073
1074class BarrierProxy(BaseProxy):
1075 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1076 def wait(self, timeout=None):
1077 return self._callmethod('wait', (timeout,))
1078 def abort(self):
1079 return self._callmethod('abort')
1080 def reset(self):
1081 return self._callmethod('reset')
1082 @property
1083 def parties(self):
1084 return self._callmethod('__getattribute__', ('parties',))
1085 @property
1086 def n_waiting(self):
1087 return self._callmethod('__getattribute__', ('n_waiting',))
1088 @property
1089 def broken(self):
1090 return self._callmethod('__getattribute__', ('broken',))
1091
1092
Benjamin Petersone711caf2008-06-11 16:44:04 +00001093class NamespaceProxy(BaseProxy):
1094 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1095 def __getattr__(self, key):
1096 if key[0] == '_':
1097 return object.__getattribute__(self, key)
1098 callmethod = object.__getattribute__(self, '_callmethod')
1099 return callmethod('__getattribute__', (key,))
1100 def __setattr__(self, key, value):
1101 if key[0] == '_':
1102 return object.__setattr__(self, key, value)
1103 callmethod = object.__getattribute__(self, '_callmethod')
1104 return callmethod('__setattr__', (key, value))
1105 def __delattr__(self, key):
1106 if key[0] == '_':
1107 return object.__delattr__(self, key)
1108 callmethod = object.__getattribute__(self, '_callmethod')
1109 return callmethod('__delattr__', (key,))
1110
1111
1112class ValueProxy(BaseProxy):
1113 _exposed_ = ('get', 'set')
1114 def get(self):
1115 return self._callmethod('get')
1116 def set(self, value):
1117 return self._callmethod('set', (value,))
1118 value = property(get, set)
1119
1120
1121BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001122 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1123 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001124 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1125 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001126 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001127class ListProxy(BaseListProxy):
1128 def __iadd__(self, value):
1129 self._callmethod('extend', (value,))
1130 return self
1131 def __imul__(self, value):
1132 self._callmethod('__imul__', (value,))
1133 return self
1134
1135
1136DictProxy = MakeProxyType('DictProxy', (
1137 '__contains__', '__delitem__', '__getitem__', '__len__',
1138 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1139 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1140 ))
1141
1142
1143ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001144 '__len__', '__getitem__', '__setitem__'
1145 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001146
1147
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001148BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001149 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001150 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001151 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001152BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001153 'apply_async': 'AsyncResult',
1154 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001155 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001156 'imap': 'Iterator',
1157 'imap_unordered': 'Iterator'
1158 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001159class PoolProxy(BasePoolProxy):
1160 def __enter__(self):
1161 return self
1162 def __exit__(self, exc_type, exc_val, exc_tb):
1163 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001164
1165#
1166# Definition of SyncManager
1167#
1168
1169class SyncManager(BaseManager):
1170 '''
1171 Subclass of `BaseManager` which supports a number of shared object types.
1172
1173 The types registered are those intended for the synchronization
1174 of threads, plus `dict`, `list` and `Namespace`.
1175
1176 The `multiprocessing.Manager()` function creates started instances of
1177 this class.
1178 '''
1179
1180SyncManager.register('Queue', queue.Queue)
1181SyncManager.register('JoinableQueue', queue.Queue)
1182SyncManager.register('Event', threading.Event, EventProxy)
1183SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1184SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1185SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1186SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1187 AcquirerProxy)
1188SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001189SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001190SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001191SyncManager.register('list', list, ListProxy)
1192SyncManager.register('dict', dict, DictProxy)
1193SyncManager.register('Value', Value, ValueProxy)
1194SyncManager.register('Array', Array, ArrayProxy)
1195SyncManager.register('Namespace', Namespace, NamespaceProxy)
1196
1197# types returned by methods of PoolProxy
1198SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1199SyncManager.register('AsyncResult', create_method=False)