blob: 7973012b98d186e0b1532e66978b93d67004fa01 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
Davin Pottse895de32019-02-23 22:08:16 -06002# Module providing manager classes for dealing
Benjamin Petersone711caf2008-06-11 16:44:04 +00003# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
Davin Pottse895de32019-02-23 22:08:16 -060011__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
12 'SharedMemoryManager' ]
Benjamin Petersone711caf2008-06-11 16:44:04 +000013
14#
15# Imports
16#
17
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000019import threading
20import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import queue
Victor Stinnerc2368cb2018-07-06 13:51:52 +020022import time
Davin Pottse895de32019-02-23 22:08:16 -060023from os import getpid
Benjamin Petersone711caf2008-06-11 16:44:04 +000024
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010025from traceback import format_exc
26
27from . import connection
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -050028from .context import reduction, get_spawning_popen, ProcessError
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010029from . import pool
30from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010031from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010032from . import get_context
Davin Pottse895de32019-02-23 22:08:16 -060033try:
34 from . import shared_memory
35 HAS_SHMEM = True
36except ImportError:
37 HAS_SHMEM = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Benjamin Petersone711caf2008-06-11 16:44:04 +000039#
Benjamin Petersone711caf2008-06-11 16:44:04 +000040# Register some things for pickling
41#
42
43def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000044 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010045reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
47view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000048if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000049 def rebuild_as_list(obj):
50 return list, (list(obj),)
51 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010052 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000053
54#
55# Type for identifying shared objects
56#
57
58class Token(object):
59 '''
60 Type to uniquely indentify a shared object
61 '''
62 __slots__ = ('typeid', 'address', 'id')
63
64 def __init__(self, typeid, address, id):
65 (self.typeid, self.address, self.id) = (typeid, address, id)
66
67 def __getstate__(self):
68 return (self.typeid, self.address, self.id)
69
70 def __setstate__(self, state):
71 (self.typeid, self.address, self.id) = state
72
73 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030074 return '%s(typeid=%r, address=%r, id=%r)' % \
75 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000076
77#
78# Function for communication with a manager's server process
79#
80
81def dispatch(c, id, methodname, args=(), kwds={}):
82 '''
83 Send a message to manager using connection `c` and return response
84 '''
85 c.send((id, methodname, args, kwds))
86 kind, result = c.recv()
87 if kind == '#RETURN':
88 return result
89 raise convert_to_error(kind, result)
90
91def convert_to_error(kind, result):
92 if kind == '#ERROR':
93 return result
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050094 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
95 if not isinstance(result, str):
96 raise TypeError(
97 "Result {0!r} (kind '{1}') type is {2}, not str".format(
98 result, kind, type(result)))
99 if kind == '#UNSERIALIZABLE':
100 return RemoteError('Unserializable message: %s\n' % result)
101 else:
102 return RemoteError(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000103 else:
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -0500104 return ValueError('Unrecognized message type {!r}'.format(kind))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000105
106class RemoteError(Exception):
107 def __str__(self):
108 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
109
110#
111# Functions for finding the method names of an object
112#
113
114def all_methods(obj):
115 '''
116 Return a list of names of methods of `obj`
117 '''
118 temp = []
119 for name in dir(obj):
120 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200121 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000122 temp.append(name)
123 return temp
124
125def public_methods(obj):
126 '''
127 Return a list of names of methods of `obj` which do not start with '_'
128 '''
129 return [name for name in all_methods(obj) if name[0] != '_']
130
131#
132# Server which is run in a process controlled by a manager
133#
134
135class Server(object):
136 '''
137 Server class which runs in a process controlled by a manager object
138 '''
139 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
140 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
141
142 def __init__(self, registry, address, authkey, serializer):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500143 if not isinstance(authkey, bytes):
144 raise TypeError(
145 "Authkey {0!r} is type {1!s}, not bytes".format(
146 authkey, type(authkey)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100148 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 Listener, Client = listener_client[serializer]
150
151 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100152 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153 self.address = self.listener.address
154
Jesse Noller63b3a972009-01-21 02:15:48 +0000155 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500157 self.id_to_local_proxy_obj = {}
158 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159
160 def serve_forever(self):
161 '''
162 Run the server forever
163 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100164 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100165 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100167 accepter = threading.Thread(target=self.accepter)
168 accepter.daemon = True
169 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100171 while not self.stop_event.is_set():
172 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173 except (KeyboardInterrupt, SystemExit):
174 pass
175 finally:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500176 if sys.stdout != sys.__stdout__: # what about stderr?
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100177 util.debug('resetting stdout, stderr')
178 sys.stdout = sys.__stdout__
179 sys.stderr = sys.__stderr__
180 sys.exit(0)
181
182 def accepter(self):
183 while True:
184 try:
185 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200186 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100187 continue
188 t = threading.Thread(target=self.handle_request, args=(c,))
189 t.daemon = True
190 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191
192 def handle_request(self, c):
193 '''
194 Handle a new connection
195 '''
196 funcname = result = request = None
197 try:
198 connection.deliver_challenge(c, self.authkey)
199 connection.answer_challenge(c, self.authkey)
200 request = c.recv()
201 ignore, funcname, args, kwds = request
202 assert funcname in self.public, '%r unrecognized' % funcname
203 func = getattr(self, funcname)
204 except Exception:
205 msg = ('#TRACEBACK', format_exc())
206 else:
207 try:
208 result = func(c, *args, **kwds)
209 except Exception:
210 msg = ('#TRACEBACK', format_exc())
211 else:
212 msg = ('#RETURN', result)
213 try:
214 c.send(msg)
215 except Exception as e:
216 try:
217 c.send(('#TRACEBACK', format_exc()))
218 except Exception:
219 pass
220 util.info('Failure to send message: %r', msg)
221 util.info(' ... request was %r', request)
222 util.info(' ... exception was %r', e)
223
224 c.close()
225
226 def serve_client(self, conn):
227 '''
228 Handle requests from the proxies in a particular process/thread
229 '''
230 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000231 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232
233 recv = conn.recv
234 send = conn.send
235 id_to_obj = self.id_to_obj
236
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100237 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238
239 try:
240 methodname = obj = None
241 request = recv()
242 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500243 try:
244 obj, exposed, gettypeid = id_to_obj[ident]
245 except KeyError as ke:
246 try:
247 obj, exposed, gettypeid = \
248 self.id_to_local_proxy_obj[ident]
249 except KeyError as second_ke:
250 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251
252 if methodname not in exposed:
253 raise AttributeError(
254 'method %r of %r object is not in exposed=%r' %
255 (methodname, type(obj), exposed)
256 )
257
258 function = getattr(obj, methodname)
259
260 try:
261 res = function(*args, **kwds)
262 except Exception as e:
263 msg = ('#ERROR', e)
264 else:
265 typeid = gettypeid and gettypeid.get(methodname, None)
266 if typeid:
267 rident, rexposed = self.create(conn, typeid, res)
268 token = Token(typeid, self.address, rident)
269 msg = ('#PROXY', (rexposed, token))
270 else:
271 msg = ('#RETURN', res)
272
273 except AttributeError:
274 if methodname is None:
275 msg = ('#TRACEBACK', format_exc())
276 else:
277 try:
278 fallback_func = self.fallback_mapping[methodname]
279 result = fallback_func(
280 self, conn, ident, obj, *args, **kwds
281 )
282 msg = ('#RETURN', result)
283 except Exception:
284 msg = ('#TRACEBACK', format_exc())
285
286 except EOFError:
287 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000288 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000289 sys.exit(0)
290
291 except Exception:
292 msg = ('#TRACEBACK', format_exc())
293
294 try:
295 try:
296 send(msg)
297 except Exception as e:
Davin Potts37156a72016-09-08 14:40:36 -0500298 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000299 except Exception as e:
300 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000301 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302 util.info(' ... message was %r', msg)
303 util.info(' ... exception was %r', e)
304 conn.close()
305 sys.exit(1)
306
307 def fallback_getvalue(self, conn, ident, obj):
308 return obj
309
310 def fallback_str(self, conn, ident, obj):
311 return str(obj)
312
313 def fallback_repr(self, conn, ident, obj):
314 return repr(obj)
315
316 fallback_mapping = {
317 '__str__':fallback_str,
318 '__repr__':fallback_repr,
319 '#GETVALUE':fallback_getvalue
320 }
321
322 def dummy(self, c):
323 pass
324
325 def debug_info(self, c):
326 '''
327 Return some info --- useful to spot problems with refcounting
328 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500329 # Perhaps include debug info about 'c'?
Charles-François Natalia924fc72014-05-25 14:12:12 +0100330 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500332 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333 keys.sort()
334 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000335 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 result.append(' %s: refcount=%s\n %s' %
337 (ident, self.id_to_refcount[ident],
338 str(self.id_to_obj[ident][0])[:75]))
339 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000340
341 def number_of_objects(self, c):
342 '''
343 Number of shared objects
344 '''
Davin Potts86a76682016-09-07 18:48:01 -0500345 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
346 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000347
348 def shutdown(self, c):
349 '''
350 Shutdown this process
351 '''
352 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100353 util.debug('manager received shutdown message')
354 c.send(('#RETURN', None))
355 except:
356 import traceback
357 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000358 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100359 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000360
361 def create(self, c, typeid, *args, **kwds):
362 '''
363 Create a new shared object and return its id
364 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100365 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000366 callable, exposed, method_to_typeid, proxytype = \
367 self.registry[typeid]
368
369 if callable is None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500370 if kwds or (len(args) != 1):
371 raise ValueError(
372 "Without callable, must have one non-keyword argument")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000373 obj = args[0]
374 else:
375 obj = callable(*args, **kwds)
376
377 if exposed is None:
378 exposed = public_methods(obj)
379 if method_to_typeid is not None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500380 if not isinstance(method_to_typeid, dict):
381 raise TypeError(
382 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
383 method_to_typeid, type(method_to_typeid)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000384 exposed = list(exposed) + list(method_to_typeid)
385
386 ident = '%x' % id(obj) # convert to string because xmlrpclib
387 # only has 32 bit signed integers
388 util.debug('%r callable returned object with id %r', typeid, ident)
389
390 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
391 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000392 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500393
394 self.incref(c, ident)
395 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000396
397 def get_methods(self, c, token):
398 '''
399 Return the methods of the shared object indicated by token
400 '''
401 return tuple(self.id_to_obj[token.id][1])
402
403 def accept_connection(self, c, name):
404 '''
405 Spawn a new thread to serve this connection
406 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000407 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000408 c.send(('#RETURN', None))
409 self.serve_client(c)
410
411 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100412 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500413 try:
414 self.id_to_refcount[ident] += 1
415 except KeyError as ke:
416 # If no external references exist but an internal (to the
417 # manager) still does and a new external reference is created
418 # from it, restore the manager's tracking of it from the
419 # previously stashed internal ref.
420 if ident in self.id_to_local_proxy_obj:
421 self.id_to_refcount[ident] = 1
422 self.id_to_obj[ident] = \
423 self.id_to_local_proxy_obj[ident]
424 obj, exposed, gettypeid = self.id_to_obj[ident]
425 util.debug('Server re-enabled tracking & INCREF %r', ident)
426 else:
427 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000428
429 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500430 if ident not in self.id_to_refcount and \
431 ident in self.id_to_local_proxy_obj:
432 util.debug('Server DECREF skipping %r', ident)
433 return
434
Charles-François Natalia924fc72014-05-25 14:12:12 +0100435 with self.mutex:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500436 if self.id_to_refcount[ident] <= 0:
437 raise AssertionError(
438 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
439 ident, self.id_to_obj[ident],
440 self.id_to_refcount[ident]))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 self.id_to_refcount[ident] -= 1
442 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500443 del self.id_to_refcount[ident]
444
445 if ident not in self.id_to_refcount:
446 # Two-step process in case the object turns out to contain other
447 # proxy objects (e.g. a managed list of managed lists).
448 # Otherwise, deleting self.id_to_obj[ident] would trigger the
449 # deleting of the stored value (another managed object) which would
450 # in turn attempt to acquire the mutex that is already held here.
451 self.id_to_obj[ident] = (None, (), None) # thread-safe
452 util.debug('disposing of obj with id %r', ident)
453 with self.mutex:
454 del self.id_to_obj[ident]
455
Benjamin Petersone711caf2008-06-11 16:44:04 +0000456
457#
458# Class to represent state of a manager
459#
460
461class State(object):
462 __slots__ = ['value']
463 INITIAL = 0
464 STARTED = 1
465 SHUTDOWN = 2
466
467#
468# Mapping from serializer name to Listener and Client types
469#
470
471listener_client = {
472 'pickle' : (connection.Listener, connection.Client),
473 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
474 }
475
476#
477# Definition of BaseManager
478#
479
480class BaseManager(object):
481 '''
482 Base class for managers
483 '''
484 _registry = {}
485 _Server = Server
486
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100487 def __init__(self, address=None, authkey=None, serializer='pickle',
488 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000489 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100490 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000491 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100492 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000493 self._state = State()
494 self._state.value = State.INITIAL
495 self._serializer = serializer
496 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100497 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000498
Benjamin Petersone711caf2008-06-11 16:44:04 +0000499 def get_server(self):
500 '''
501 Return server object with serve_forever() method and address attribute
502 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500503 if self._state.value != State.INITIAL:
504 if self._state.value == State.STARTED:
505 raise ProcessError("Already started server")
506 elif self._state.value == State.SHUTDOWN:
507 raise ProcessError("Manager has shut down")
508 else:
509 raise ProcessError(
510 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000511 return Server(self._registry, self._address,
512 self._authkey, self._serializer)
513
514 def connect(self):
515 '''
516 Connect manager object to the server process
517 '''
518 Listener, Client = listener_client[self._serializer]
519 conn = Client(self._address, authkey=self._authkey)
520 dispatch(conn, None, 'dummy')
521 self._state.value = State.STARTED
522
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000523 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000524 '''
525 Spawn a server process for this manager object
526 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500527 if self._state.value != State.INITIAL:
528 if self._state.value == State.STARTED:
529 raise ProcessError("Already started server")
530 elif self._state.value == State.SHUTDOWN:
531 raise ProcessError("Manager has shut down")
532 else:
533 raise ProcessError(
534 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000535
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200536 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000537 raise TypeError('initializer must be a callable')
538
Benjamin Petersone711caf2008-06-11 16:44:04 +0000539 # pipe over which we will retrieve address of server
540 reader, writer = connection.Pipe(duplex=False)
541
542 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100543 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000544 target=type(self)._run_server,
545 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000546 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000547 )
548 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000549 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000550 self._process.start()
551
552 # get address of server
553 writer.close()
554 self._address = reader.recv()
555 reader.close()
556
557 # register a finalizer
558 self._state.value = State.STARTED
559 self.shutdown = util.Finalize(
560 self, type(self)._finalize_manager,
561 args=(self._process, self._address, self._authkey,
562 self._state, self._Client),
563 exitpriority=0
564 )
565
566 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000567 def _run_server(cls, registry, address, authkey, serializer, writer,
568 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000569 '''
570 Create a server, report its address and run it
571 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000572 if initializer is not None:
573 initializer(*initargs)
574
Benjamin Petersone711caf2008-06-11 16:44:04 +0000575 # create server
576 server = cls._Server(registry, address, authkey, serializer)
577
578 # inform parent process of the server's address
579 writer.send(server.address)
580 writer.close()
581
582 # run the manager
583 util.info('manager serving at %r', server.address)
584 server.serve_forever()
585
586 def _create(self, typeid, *args, **kwds):
587 '''
588 Create a new shared object; return the token and exposed tuple
589 '''
590 assert self._state.value == State.STARTED, 'server not yet started'
591 conn = self._Client(self._address, authkey=self._authkey)
592 try:
593 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
594 finally:
595 conn.close()
596 return Token(typeid, self._address, id), exposed
597
598 def join(self, timeout=None):
599 '''
600 Join the manager process (if it has been spawned)
601 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100602 if self._process is not None:
603 self._process.join(timeout)
604 if not self._process.is_alive():
605 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000606
607 def _debug_info(self):
608 '''
609 Return some info about the servers shared objects and connections
610 '''
611 conn = self._Client(self._address, authkey=self._authkey)
612 try:
613 return dispatch(conn, None, 'debug_info')
614 finally:
615 conn.close()
616
617 def _number_of_objects(self):
618 '''
619 Return the number of shared objects
620 '''
621 conn = self._Client(self._address, authkey=self._authkey)
622 try:
623 return dispatch(conn, None, 'number_of_objects')
624 finally:
625 conn.close()
626
627 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100628 if self._state.value == State.INITIAL:
629 self.start()
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500630 if self._state.value != State.STARTED:
631 if self._state.value == State.INITIAL:
632 raise ProcessError("Unable to start server")
633 elif self._state.value == State.SHUTDOWN:
634 raise ProcessError("Manager has shut down")
635 else:
636 raise ProcessError(
637 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000638 return self
639
640 def __exit__(self, exc_type, exc_val, exc_tb):
641 self.shutdown()
642
643 @staticmethod
644 def _finalize_manager(process, address, authkey, state, _Client):
645 '''
646 Shutdown the manager process; will be registered as a finalizer
647 '''
648 if process.is_alive():
649 util.info('sending shutdown message to manager')
650 try:
651 conn = _Client(address, authkey=authkey)
652 try:
653 dispatch(conn, None, 'shutdown')
654 finally:
655 conn.close()
656 except Exception:
657 pass
658
Richard Oudkerk3049f122012-06-15 20:08:29 +0100659 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000660 if process.is_alive():
661 util.info('manager still alive')
662 if hasattr(process, 'terminate'):
663 util.info('trying to `terminate()` manager process')
664 process.terminate()
665 process.join(timeout=0.1)
666 if process.is_alive():
667 util.info('manager still alive after terminate')
668
669 state.value = State.SHUTDOWN
670 try:
671 del BaseProxy._address_to_local[address]
672 except KeyError:
673 pass
674
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200675 @property
676 def address(self):
677 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000678
679 @classmethod
680 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
681 method_to_typeid=None, create_method=True):
682 '''
683 Register a typeid with the manager type
684 '''
685 if '_registry' not in cls.__dict__:
686 cls._registry = cls._registry.copy()
687
688 if proxytype is None:
689 proxytype = AutoProxy
690
691 exposed = exposed or getattr(proxytype, '_exposed_', None)
692
693 method_to_typeid = method_to_typeid or \
694 getattr(proxytype, '_method_to_typeid_', None)
695
696 if method_to_typeid:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500697 for key, value in list(method_to_typeid.items()): # isinstance?
Benjamin Petersone711caf2008-06-11 16:44:04 +0000698 assert type(key) is str, '%r is not a string' % key
699 assert type(value) is str, '%r is not a string' % value
700
701 cls._registry[typeid] = (
702 callable, exposed, method_to_typeid, proxytype
703 )
704
705 if create_method:
706 def temp(self, *args, **kwds):
707 util.debug('requesting creation of a shared %r object', typeid)
708 token, exp = self._create(typeid, *args, **kwds)
709 proxy = proxytype(
710 token, self._serializer, manager=self,
711 authkey=self._authkey, exposed=exp
712 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000713 conn = self._Client(token.address, authkey=self._authkey)
714 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000715 return proxy
716 temp.__name__ = typeid
717 setattr(cls, typeid, temp)
718
719#
720# Subclass of set which get cleared after a fork
721#
722
723class ProcessLocalSet(set):
724 def __init__(self):
725 util.register_after_fork(self, lambda obj: obj.clear())
726 def __reduce__(self):
727 return type(self), ()
728
729#
730# Definition of BaseProxy
731#
732
733class BaseProxy(object):
734 '''
735 A base for proxies of shared objects
736 '''
737 _address_to_local = {}
738 _mutex = util.ForkAwareThreadLock()
739
740 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500741 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100742 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000743 tls_idset = BaseProxy._address_to_local.get(token.address, None)
744 if tls_idset is None:
745 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
746 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000747
748 # self._tls is used to record the connection used by this
749 # thread to communicate with the manager at token.address
750 self._tls = tls_idset[0]
751
752 # self._idset is used to record the identities of all shared
753 # objects for which the current process owns references and
754 # which are in the manager at token.address
755 self._idset = tls_idset[1]
756
757 self._token = token
758 self._id = self._token.id
759 self._manager = manager
760 self._serializer = serializer
761 self._Client = listener_client[serializer][1]
762
Davin Potts86a76682016-09-07 18:48:01 -0500763 # Should be set to True only when a proxy object is being created
764 # on the manager server; primary use case: nested proxy objects.
765 # RebuildProxy detects when a proxy is being created on the manager
766 # and sets this value appropriately.
767 self._owned_by_manager = manager_owned
768
Benjamin Petersone711caf2008-06-11 16:44:04 +0000769 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100770 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000771 elif self._manager is not None:
772 self._authkey = self._manager._authkey
773 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100774 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000775
776 if incref:
777 self._incref()
778
779 util.register_after_fork(self, BaseProxy._after_fork)
780
781 def _connect(self):
782 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100783 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000784 if threading.current_thread().name != 'MainThread':
785 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 conn = self._Client(self._token.address, authkey=self._authkey)
787 dispatch(conn, None, 'accept_connection', (name,))
788 self._tls.connection = conn
789
790 def _callmethod(self, methodname, args=(), kwds={}):
791 '''
792 Try to call a method of the referrent and return a copy of the result
793 '''
794 try:
795 conn = self._tls.connection
796 except AttributeError:
797 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000798 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000799 self._connect()
800 conn = self._tls.connection
801
802 conn.send((self._id, methodname, args, kwds))
803 kind, result = conn.recv()
804
805 if kind == '#RETURN':
806 return result
807 elif kind == '#PROXY':
808 exposed, token = result
809 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100810 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000811 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000812 token, self._serializer, manager=self._manager,
813 authkey=self._authkey, exposed=exposed
814 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000815 conn = self._Client(token.address, authkey=self._authkey)
816 dispatch(conn, None, 'decref', (token.id,))
817 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000818 raise convert_to_error(kind, result)
819
820 def _getvalue(self):
821 '''
822 Get a copy of the value of the referent
823 '''
824 return self._callmethod('#GETVALUE')
825
826 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500827 if self._owned_by_manager:
828 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
829 return
830
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 conn = self._Client(self._token.address, authkey=self._authkey)
832 dispatch(conn, None, 'incref', (self._id,))
833 util.debug('INCREF %r', self._token.id)
834
835 self._idset.add(self._id)
836
837 state = self._manager and self._manager._state
838
839 self._close = util.Finalize(
840 self, BaseProxy._decref,
841 args=(self._token, self._authkey, state,
842 self._tls, self._idset, self._Client),
843 exitpriority=10
844 )
845
846 @staticmethod
847 def _decref(token, authkey, state, tls, idset, _Client):
848 idset.discard(token.id)
849
850 # check whether manager is still alive
851 if state is None or state.value == State.STARTED:
852 # tell manager this process no longer cares about referent
853 try:
854 util.debug('DECREF %r', token.id)
855 conn = _Client(token.address, authkey=authkey)
856 dispatch(conn, None, 'decref', (token.id,))
857 except Exception as e:
858 util.debug('... decref failed %s', e)
859
860 else:
861 util.debug('DECREF %r -- manager already shutdown', token.id)
862
863 # check whether we can close this thread's connection because
864 # the process owns no more references to objects for this manager
865 if not idset and hasattr(tls, 'connection'):
866 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000867 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000868 tls.connection.close()
869 del tls.connection
870
871 def _after_fork(self):
872 self._manager = None
873 try:
874 self._incref()
875 except Exception as e:
876 # the proxy may just be for a manager which has shutdown
877 util.info('incref failed: %s' % e)
878
879 def __reduce__(self):
880 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500881 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000882 kwds['authkey'] = self._authkey
883
884 if getattr(self, '_isauto', False):
885 kwds['exposed'] = self._exposed_
886 return (RebuildProxy,
887 (AutoProxy, self._token, self._serializer, kwds))
888 else:
889 return (RebuildProxy,
890 (type(self), self._token, self._serializer, kwds))
891
892 def __deepcopy__(self, memo):
893 return self._getvalue()
894
895 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300896 return '<%s object, typeid %r at %#x>' % \
897 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898
899 def __str__(self):
900 '''
901 Return representation of the referent (or a fall-back if that fails)
902 '''
903 try:
904 return self._callmethod('__repr__')
905 except Exception:
906 return repr(self)[:-1] + "; '__str__()' failed>"
907
908#
909# Function used for unpickling
910#
911
912def RebuildProxy(func, token, serializer, kwds):
913 '''
914 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000915 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100916 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000917 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500918 util.debug('Rebuild a proxy owned by manager, token=%r', token)
919 kwds['manager_owned'] = True
920 if token.id not in server.id_to_local_proxy_obj:
921 server.id_to_local_proxy_obj[token.id] = \
922 server.id_to_obj[token.id]
923 incref = (
924 kwds.pop('incref', True) and
925 not getattr(process.current_process(), '_inheriting', False)
926 )
927 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000928
929#
930# Functions to create proxies and proxy types
931#
932
933def MakeProxyType(name, exposed, _cache={}):
934 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300935 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000936 '''
937 exposed = tuple(exposed)
938 try:
939 return _cache[(name, exposed)]
940 except KeyError:
941 pass
942
943 dic = {}
944
945 for meth in exposed:
946 exec('''def %s(self, *args, **kwds):
947 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
948
949 ProxyType = type(name, (BaseProxy,), dic)
950 ProxyType._exposed_ = exposed
951 _cache[(name, exposed)] = ProxyType
952 return ProxyType
953
954
955def AutoProxy(token, serializer, manager=None, authkey=None,
956 exposed=None, incref=True):
957 '''
958 Return an auto-proxy for `token`
959 '''
960 _Client = listener_client[serializer][1]
961
962 if exposed is None:
963 conn = _Client(token.address, authkey=authkey)
964 try:
965 exposed = dispatch(conn, None, 'get_methods', (token,))
966 finally:
967 conn.close()
968
969 if authkey is None and manager is not None:
970 authkey = manager._authkey
971 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100972 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000973
974 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
975 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
976 incref=incref)
977 proxy._isauto = True
978 return proxy
979
980#
981# Types/callables which we will register with SyncManager
982#
983
984class Namespace(object):
985 def __init__(self, **kwds):
986 self.__dict__.update(kwds)
987 def __repr__(self):
988 items = list(self.__dict__.items())
989 temp = []
990 for name, value in items:
991 if not name.startswith('_'):
992 temp.append('%s=%r' % (name, value))
993 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300994 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000995
996class Value(object):
997 def __init__(self, typecode, value, lock=True):
998 self._typecode = typecode
999 self._value = value
1000 def get(self):
1001 return self._value
1002 def set(self, value):
1003 self._value = value
1004 def __repr__(self):
1005 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1006 value = property(get, set)
1007
1008def Array(typecode, sequence, lock=True):
1009 return array.array(typecode, sequence)
1010
1011#
1012# Proxy types used by SyncManager
1013#
1014
1015class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +00001016 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001017 def __iter__(self):
1018 return self
1019 def __next__(self, *args):
1020 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001021 def send(self, *args):
1022 return self._callmethod('send', args)
1023 def throw(self, *args):
1024 return self._callmethod('throw', args)
1025 def close(self, *args):
1026 return self._callmethod('close', args)
1027
1028
1029class AcquirerProxy(BaseProxy):
1030 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +01001031 def acquire(self, blocking=True, timeout=None):
1032 args = (blocking,) if timeout is None else (blocking, timeout)
1033 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001034 def release(self):
1035 return self._callmethod('release')
1036 def __enter__(self):
1037 return self._callmethod('acquire')
1038 def __exit__(self, exc_type, exc_val, exc_tb):
1039 return self._callmethod('release')
1040
1041
1042class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001043 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001044 def wait(self, timeout=None):
1045 return self._callmethod('wait', (timeout,))
Antoine Pitrou48350412017-07-04 08:59:22 +02001046 def notify(self, n=1):
1047 return self._callmethod('notify', (n,))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001049 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001050 def wait_for(self, predicate, timeout=None):
1051 result = predicate()
1052 if result:
1053 return result
1054 if timeout is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001055 endtime = time.monotonic() + timeout
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001056 else:
1057 endtime = None
1058 waittime = None
1059 while not result:
1060 if endtime is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001061 waittime = endtime - time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001062 if waittime <= 0:
1063 break
1064 self.wait(waittime)
1065 result = predicate()
1066 return result
1067
Benjamin Petersone711caf2008-06-11 16:44:04 +00001068
1069class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001070 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001071 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001072 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001073 def set(self):
1074 return self._callmethod('set')
1075 def clear(self):
1076 return self._callmethod('clear')
1077 def wait(self, timeout=None):
1078 return self._callmethod('wait', (timeout,))
1079
Richard Oudkerk3730a172012-06-15 18:26:07 +01001080
1081class BarrierProxy(BaseProxy):
1082 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1083 def wait(self, timeout=None):
1084 return self._callmethod('wait', (timeout,))
1085 def abort(self):
1086 return self._callmethod('abort')
1087 def reset(self):
1088 return self._callmethod('reset')
1089 @property
1090 def parties(self):
1091 return self._callmethod('__getattribute__', ('parties',))
1092 @property
1093 def n_waiting(self):
1094 return self._callmethod('__getattribute__', ('n_waiting',))
1095 @property
1096 def broken(self):
1097 return self._callmethod('__getattribute__', ('broken',))
1098
1099
Benjamin Petersone711caf2008-06-11 16:44:04 +00001100class NamespaceProxy(BaseProxy):
1101 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1102 def __getattr__(self, key):
1103 if key[0] == '_':
1104 return object.__getattribute__(self, key)
1105 callmethod = object.__getattribute__(self, '_callmethod')
1106 return callmethod('__getattribute__', (key,))
1107 def __setattr__(self, key, value):
1108 if key[0] == '_':
1109 return object.__setattr__(self, key, value)
1110 callmethod = object.__getattribute__(self, '_callmethod')
1111 return callmethod('__setattr__', (key, value))
1112 def __delattr__(self, key):
1113 if key[0] == '_':
1114 return object.__delattr__(self, key)
1115 callmethod = object.__getattribute__(self, '_callmethod')
1116 return callmethod('__delattr__', (key,))
1117
1118
1119class ValueProxy(BaseProxy):
1120 _exposed_ = ('get', 'set')
1121 def get(self):
1122 return self._callmethod('get')
1123 def set(self, value):
1124 return self._callmethod('set', (value,))
1125 value = property(get, set)
1126
1127
1128BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001129 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1130 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001131 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1132 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001133 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001134class ListProxy(BaseListProxy):
1135 def __iadd__(self, value):
1136 self._callmethod('extend', (value,))
1137 return self
1138 def __imul__(self, value):
1139 self._callmethod('__imul__', (value,))
1140 return self
1141
1142
1143DictProxy = MakeProxyType('DictProxy', (
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001144 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
Rémi Lapeyrea31f4cc2019-02-12 01:37:24 +01001145 '__setitem__', 'clear', 'copy', 'get', 'items',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001146 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1147 ))
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001148DictProxy._method_to_typeid_ = {
1149 '__iter__': 'Iterator',
1150 }
Benjamin Petersone711caf2008-06-11 16:44:04 +00001151
1152
1153ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001154 '__len__', '__getitem__', '__setitem__'
1155 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001156
1157
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001158BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001159 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001160 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001161 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001162BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001163 'apply_async': 'AsyncResult',
1164 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001165 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001166 'imap': 'Iterator',
1167 'imap_unordered': 'Iterator'
1168 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001169class PoolProxy(BasePoolProxy):
1170 def __enter__(self):
1171 return self
1172 def __exit__(self, exc_type, exc_val, exc_tb):
1173 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001174
1175#
1176# Definition of SyncManager
1177#
1178
1179class SyncManager(BaseManager):
1180 '''
1181 Subclass of `BaseManager` which supports a number of shared object types.
1182
1183 The types registered are those intended for the synchronization
1184 of threads, plus `dict`, `list` and `Namespace`.
1185
1186 The `multiprocessing.Manager()` function creates started instances of
1187 this class.
1188 '''
1189
1190SyncManager.register('Queue', queue.Queue)
1191SyncManager.register('JoinableQueue', queue.Queue)
1192SyncManager.register('Event', threading.Event, EventProxy)
1193SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1194SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1195SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1196SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1197 AcquirerProxy)
1198SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001199SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001200SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001201SyncManager.register('list', list, ListProxy)
1202SyncManager.register('dict', dict, DictProxy)
1203SyncManager.register('Value', Value, ValueProxy)
1204SyncManager.register('Array', Array, ArrayProxy)
1205SyncManager.register('Namespace', Namespace, NamespaceProxy)
1206
1207# types returned by methods of PoolProxy
1208SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1209SyncManager.register('AsyncResult', create_method=False)
Davin Pottse895de32019-02-23 22:08:16 -06001210
1211#
1212# Definition of SharedMemoryManager and SharedMemoryServer
1213#
1214
1215if HAS_SHMEM:
1216 class _SharedMemoryTracker:
1217 "Manages one or more shared memory segments."
1218
1219 def __init__(self, name, segment_names=[]):
1220 self.shared_memory_context_name = name
1221 self.segment_names = segment_names
1222
1223 def register_segment(self, segment_name):
1224 "Adds the supplied shared memory block name to tracker."
1225 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1226 self.segment_names.append(segment_name)
1227
1228 def destroy_segment(self, segment_name):
1229 """Calls unlink() on the shared memory block with the supplied name
1230 and removes it from the list of blocks being tracked."""
1231 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1232 self.segment_names.remove(segment_name)
1233 segment = shared_memory.SharedMemory(segment_name)
1234 segment.close()
1235 segment.unlink()
1236
1237 def unlink(self):
1238 "Calls destroy_segment() on all tracked shared memory blocks."
1239 for segment_name in self.segment_names[:]:
1240 self.destroy_segment(segment_name)
1241
1242 def __del__(self):
1243 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1244 self.unlink()
1245
1246 def __getstate__(self):
1247 return (self.shared_memory_context_name, self.segment_names)
1248
1249 def __setstate__(self, state):
1250 self.__init__(*state)
1251
1252
1253 class SharedMemoryServer(Server):
1254
1255 public = Server.public + \
1256 ['track_segment', 'release_segment', 'list_segments']
1257
1258 def __init__(self, *args, **kwargs):
1259 Server.__init__(self, *args, **kwargs)
1260 self.shared_memory_context = \
1261 _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
1262 util.debug(f"SharedMemoryServer started by pid {getpid()}")
1263
1264 def create(self, c, typeid, *args, **kwargs):
1265 """Create a new distributed-shared object (not backed by a shared
1266 memory block) and return its id to be used in a Proxy Object."""
1267 # Unless set up as a shared proxy, don't make shared_memory_context
1268 # a standard part of kwargs. This makes things easier for supplying
1269 # simple functions.
1270 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1271 kwargs['shared_memory_context'] = self.shared_memory_context
1272 return Server.create(self, c, typeid, *args, **kwargs)
1273
1274 def shutdown(self, c):
1275 "Call unlink() on all tracked shared memory, terminate the Server."
1276 self.shared_memory_context.unlink()
1277 return Server.shutdown(self, c)
1278
1279 def track_segment(self, c, segment_name):
1280 "Adds the supplied shared memory block name to Server's tracker."
1281 self.shared_memory_context.register_segment(segment_name)
1282
1283 def release_segment(self, c, segment_name):
1284 """Calls unlink() on the shared memory block with the supplied name
1285 and removes it from the tracker instance inside the Server."""
1286 self.shared_memory_context.destroy_segment(segment_name)
1287
1288 def list_segments(self, c):
1289 """Returns a list of names of shared memory blocks that the Server
1290 is currently tracking."""
1291 return self.shared_memory_context.segment_names
1292
1293
1294 class SharedMemoryManager(BaseManager):
1295 """Like SyncManager but uses SharedMemoryServer instead of Server.
1296
1297 It provides methods for creating and returning SharedMemory instances
1298 and for creating a list-like object (ShareableList) backed by shared
1299 memory. It also provides methods that create and return Proxy Objects
1300 that support synchronization across processes (i.e. multi-process-safe
1301 locks and semaphores).
1302 """
1303
1304 _Server = SharedMemoryServer
1305
1306 def __init__(self, *args, **kwargs):
1307 BaseManager.__init__(self, *args, **kwargs)
1308 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1309
1310 def __del__(self):
1311 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1312 pass
1313
1314 def get_server(self):
1315 'Better than monkeypatching for now; merge into Server ultimately'
1316 if self._state.value != State.INITIAL:
1317 if self._state.value == State.STARTED:
1318 raise ProcessError("Already started SharedMemoryServer")
1319 elif self._state.value == State.SHUTDOWN:
1320 raise ProcessError("SharedMemoryManager has shut down")
1321 else:
1322 raise ProcessError(
1323 "Unknown state {!r}".format(self._state.value))
1324 return self._Server(self._registry, self._address,
1325 self._authkey, self._serializer)
1326
1327 def SharedMemory(self, size):
1328 """Returns a new SharedMemory instance with the specified size in
1329 bytes, to be tracked by the manager."""
1330 with self._Client(self._address, authkey=self._authkey) as conn:
1331 sms = shared_memory.SharedMemory(None, create=True, size=size)
1332 try:
1333 dispatch(conn, None, 'track_segment', (sms.name,))
1334 except BaseException as e:
1335 sms.unlink()
1336 raise e
1337 return sms
1338
1339 def ShareableList(self, sequence):
1340 """Returns a new ShareableList instance populated with the values
1341 from the input sequence, to be tracked by the manager."""
1342 with self._Client(self._address, authkey=self._authkey) as conn:
1343 sl = shared_memory.ShareableList(sequence)
1344 try:
1345 dispatch(conn, None, 'track_segment', (sl.shm.name,))
1346 except BaseException as e:
1347 sl.shm.unlink()
1348 raise e
1349 return sl