blob: 80c3ddb9154a4da88b74dd998882f70d2808da7e [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
Davin Pottse895de32019-02-23 22:08:16 -06002# Module providing manager classes for dealing
Benjamin Petersone711caf2008-06-11 16:44:04 +00003# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
Davin Pottse895de32019-02-23 22:08:16 -060011__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
12 'SharedMemoryManager' ]
Benjamin Petersone711caf2008-06-11 16:44:04 +000013
14#
15# Imports
16#
17
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000019import threading
20import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import queue
Victor Stinnerc2368cb2018-07-06 13:51:52 +020022import time
Davin Pottse895de32019-02-23 22:08:16 -060023from os import getpid
Benjamin Petersone711caf2008-06-11 16:44:04 +000024
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010025from traceback import format_exc
26
27from . import connection
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -050028from .context import reduction, get_spawning_popen, ProcessError
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010029from . import pool
30from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010031from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010032from . import get_context
Davin Pottse895de32019-02-23 22:08:16 -060033try:
34 from . import shared_memory
35 HAS_SHMEM = True
36except ImportError:
37 HAS_SHMEM = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Benjamin Petersone711caf2008-06-11 16:44:04 +000039#
Benjamin Petersone711caf2008-06-11 16:44:04 +000040# Register some things for pickling
41#
42
43def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000044 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010045reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
47view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000048if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000049 def rebuild_as_list(obj):
50 return list, (list(obj),)
51 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010052 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000053
54#
55# Type for identifying shared objects
56#
57
58class Token(object):
59 '''
60 Type to uniquely indentify a shared object
61 '''
62 __slots__ = ('typeid', 'address', 'id')
63
64 def __init__(self, typeid, address, id):
65 (self.typeid, self.address, self.id) = (typeid, address, id)
66
67 def __getstate__(self):
68 return (self.typeid, self.address, self.id)
69
70 def __setstate__(self, state):
71 (self.typeid, self.address, self.id) = state
72
73 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030074 return '%s(typeid=%r, address=%r, id=%r)' % \
75 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000076
77#
78# Function for communication with a manager's server process
79#
80
81def dispatch(c, id, methodname, args=(), kwds={}):
82 '''
83 Send a message to manager using connection `c` and return response
84 '''
85 c.send((id, methodname, args, kwds))
86 kind, result = c.recv()
87 if kind == '#RETURN':
88 return result
89 raise convert_to_error(kind, result)
90
91def convert_to_error(kind, result):
92 if kind == '#ERROR':
93 return result
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050094 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
95 if not isinstance(result, str):
96 raise TypeError(
97 "Result {0!r} (kind '{1}') type is {2}, not str".format(
98 result, kind, type(result)))
99 if kind == '#UNSERIALIZABLE':
100 return RemoteError('Unserializable message: %s\n' % result)
101 else:
102 return RemoteError(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000103 else:
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -0500104 return ValueError('Unrecognized message type {!r}'.format(kind))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000105
106class RemoteError(Exception):
107 def __str__(self):
108 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
109
110#
111# Functions for finding the method names of an object
112#
113
114def all_methods(obj):
115 '''
116 Return a list of names of methods of `obj`
117 '''
118 temp = []
119 for name in dir(obj):
120 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200121 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000122 temp.append(name)
123 return temp
124
125def public_methods(obj):
126 '''
127 Return a list of names of methods of `obj` which do not start with '_'
128 '''
129 return [name for name in all_methods(obj) if name[0] != '_']
130
131#
132# Server which is run in a process controlled by a manager
133#
134
135class Server(object):
136 '''
137 Server class which runs in a process controlled by a manager object
138 '''
139 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
140 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
141
142 def __init__(self, registry, address, authkey, serializer):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500143 if not isinstance(authkey, bytes):
144 raise TypeError(
145 "Authkey {0!r} is type {1!s}, not bytes".format(
146 authkey, type(authkey)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100148 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 Listener, Client = listener_client[serializer]
150
151 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100152 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153 self.address = self.listener.address
154
Jesse Noller63b3a972009-01-21 02:15:48 +0000155 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500157 self.id_to_local_proxy_obj = {}
158 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159
160 def serve_forever(self):
161 '''
162 Run the server forever
163 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100164 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100165 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100167 accepter = threading.Thread(target=self.accepter)
168 accepter.daemon = True
169 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100171 while not self.stop_event.is_set():
172 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173 except (KeyboardInterrupt, SystemExit):
174 pass
175 finally:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500176 if sys.stdout != sys.__stdout__: # what about stderr?
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100177 util.debug('resetting stdout, stderr')
178 sys.stdout = sys.__stdout__
179 sys.stderr = sys.__stderr__
180 sys.exit(0)
181
182 def accepter(self):
183 while True:
184 try:
185 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200186 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100187 continue
188 t = threading.Thread(target=self.handle_request, args=(c,))
189 t.daemon = True
190 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191
192 def handle_request(self, c):
193 '''
194 Handle a new connection
195 '''
196 funcname = result = request = None
197 try:
198 connection.deliver_challenge(c, self.authkey)
199 connection.answer_challenge(c, self.authkey)
200 request = c.recv()
201 ignore, funcname, args, kwds = request
202 assert funcname in self.public, '%r unrecognized' % funcname
203 func = getattr(self, funcname)
204 except Exception:
205 msg = ('#TRACEBACK', format_exc())
206 else:
207 try:
208 result = func(c, *args, **kwds)
209 except Exception:
210 msg = ('#TRACEBACK', format_exc())
211 else:
212 msg = ('#RETURN', result)
213 try:
214 c.send(msg)
215 except Exception as e:
216 try:
217 c.send(('#TRACEBACK', format_exc()))
218 except Exception:
219 pass
220 util.info('Failure to send message: %r', msg)
221 util.info(' ... request was %r', request)
222 util.info(' ... exception was %r', e)
223
224 c.close()
225
226 def serve_client(self, conn):
227 '''
228 Handle requests from the proxies in a particular process/thread
229 '''
230 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000231 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232
233 recv = conn.recv
234 send = conn.send
235 id_to_obj = self.id_to_obj
236
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100237 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238
239 try:
240 methodname = obj = None
241 request = recv()
242 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500243 try:
244 obj, exposed, gettypeid = id_to_obj[ident]
245 except KeyError as ke:
246 try:
247 obj, exposed, gettypeid = \
248 self.id_to_local_proxy_obj[ident]
249 except KeyError as second_ke:
250 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251
252 if methodname not in exposed:
253 raise AttributeError(
254 'method %r of %r object is not in exposed=%r' %
255 (methodname, type(obj), exposed)
256 )
257
258 function = getattr(obj, methodname)
259
260 try:
261 res = function(*args, **kwds)
262 except Exception as e:
263 msg = ('#ERROR', e)
264 else:
265 typeid = gettypeid and gettypeid.get(methodname, None)
266 if typeid:
267 rident, rexposed = self.create(conn, typeid, res)
268 token = Token(typeid, self.address, rident)
269 msg = ('#PROXY', (rexposed, token))
270 else:
271 msg = ('#RETURN', res)
272
273 except AttributeError:
274 if methodname is None:
275 msg = ('#TRACEBACK', format_exc())
276 else:
277 try:
278 fallback_func = self.fallback_mapping[methodname]
279 result = fallback_func(
280 self, conn, ident, obj, *args, **kwds
281 )
282 msg = ('#RETURN', result)
283 except Exception:
284 msg = ('#TRACEBACK', format_exc())
285
286 except EOFError:
287 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000288 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000289 sys.exit(0)
290
291 except Exception:
292 msg = ('#TRACEBACK', format_exc())
293
294 try:
295 try:
296 send(msg)
297 except Exception as e:
Davin Potts37156a72016-09-08 14:40:36 -0500298 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000299 except Exception as e:
300 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000301 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302 util.info(' ... message was %r', msg)
303 util.info(' ... exception was %r', e)
304 conn.close()
305 sys.exit(1)
306
307 def fallback_getvalue(self, conn, ident, obj):
308 return obj
309
310 def fallback_str(self, conn, ident, obj):
311 return str(obj)
312
313 def fallback_repr(self, conn, ident, obj):
314 return repr(obj)
315
316 fallback_mapping = {
317 '__str__':fallback_str,
318 '__repr__':fallback_repr,
319 '#GETVALUE':fallback_getvalue
320 }
321
322 def dummy(self, c):
323 pass
324
325 def debug_info(self, c):
326 '''
327 Return some info --- useful to spot problems with refcounting
328 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500329 # Perhaps include debug info about 'c'?
Charles-François Natalia924fc72014-05-25 14:12:12 +0100330 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500332 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333 keys.sort()
334 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000335 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 result.append(' %s: refcount=%s\n %s' %
337 (ident, self.id_to_refcount[ident],
338 str(self.id_to_obj[ident][0])[:75]))
339 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000340
341 def number_of_objects(self, c):
342 '''
343 Number of shared objects
344 '''
Davin Potts86a76682016-09-07 18:48:01 -0500345 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
346 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000347
348 def shutdown(self, c):
349 '''
350 Shutdown this process
351 '''
352 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100353 util.debug('manager received shutdown message')
354 c.send(('#RETURN', None))
355 except:
356 import traceback
357 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000358 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100359 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000360
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300361 def create(*args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000362 '''
363 Create a new shared object and return its id
364 '''
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300365 if len(args) >= 3:
366 self, c, typeid, *args = args
367 elif not args:
368 raise TypeError("descriptor 'create' of 'Server' object "
369 "needs an argument")
370 else:
371 if 'typeid' not in kwds:
372 raise TypeError('create expected at least 2 positional '
373 'arguments, got %d' % (len(args)-1))
374 typeid = kwds.pop('typeid')
375 if len(args) >= 2:
376 self, c, *args = args
377 import warnings
378 warnings.warn("Passing 'typeid' as keyword argument is deprecated",
379 DeprecationWarning, stacklevel=2)
380 else:
381 if 'c' not in kwds:
382 raise TypeError('create expected at least 2 positional '
383 'arguments, got %d' % (len(args)-1))
384 c = kwds.pop('c')
385 self, *args = args
386 import warnings
387 warnings.warn("Passing 'c' as keyword argument is deprecated",
388 DeprecationWarning, stacklevel=2)
389 args = tuple(args)
390
Charles-François Natalia924fc72014-05-25 14:12:12 +0100391 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000392 callable, exposed, method_to_typeid, proxytype = \
393 self.registry[typeid]
394
395 if callable is None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500396 if kwds or (len(args) != 1):
397 raise ValueError(
398 "Without callable, must have one non-keyword argument")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000399 obj = args[0]
400 else:
401 obj = callable(*args, **kwds)
402
403 if exposed is None:
404 exposed = public_methods(obj)
405 if method_to_typeid is not None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500406 if not isinstance(method_to_typeid, dict):
407 raise TypeError(
408 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
409 method_to_typeid, type(method_to_typeid)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000410 exposed = list(exposed) + list(method_to_typeid)
411
412 ident = '%x' % id(obj) # convert to string because xmlrpclib
413 # only has 32 bit signed integers
414 util.debug('%r callable returned object with id %r', typeid, ident)
415
416 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
417 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000418 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500419
420 self.incref(c, ident)
421 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000422
423 def get_methods(self, c, token):
424 '''
425 Return the methods of the shared object indicated by token
426 '''
427 return tuple(self.id_to_obj[token.id][1])
428
429 def accept_connection(self, c, name):
430 '''
431 Spawn a new thread to serve this connection
432 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000433 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000434 c.send(('#RETURN', None))
435 self.serve_client(c)
436
437 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100438 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500439 try:
440 self.id_to_refcount[ident] += 1
441 except KeyError as ke:
442 # If no external references exist but an internal (to the
443 # manager) still does and a new external reference is created
444 # from it, restore the manager's tracking of it from the
445 # previously stashed internal ref.
446 if ident in self.id_to_local_proxy_obj:
447 self.id_to_refcount[ident] = 1
448 self.id_to_obj[ident] = \
449 self.id_to_local_proxy_obj[ident]
450 obj, exposed, gettypeid = self.id_to_obj[ident]
451 util.debug('Server re-enabled tracking & INCREF %r', ident)
452 else:
453 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000454
455 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500456 if ident not in self.id_to_refcount and \
457 ident in self.id_to_local_proxy_obj:
458 util.debug('Server DECREF skipping %r', ident)
459 return
460
Charles-François Natalia924fc72014-05-25 14:12:12 +0100461 with self.mutex:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500462 if self.id_to_refcount[ident] <= 0:
463 raise AssertionError(
464 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
465 ident, self.id_to_obj[ident],
466 self.id_to_refcount[ident]))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000467 self.id_to_refcount[ident] -= 1
468 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500469 del self.id_to_refcount[ident]
470
471 if ident not in self.id_to_refcount:
472 # Two-step process in case the object turns out to contain other
473 # proxy objects (e.g. a managed list of managed lists).
474 # Otherwise, deleting self.id_to_obj[ident] would trigger the
475 # deleting of the stored value (another managed object) which would
476 # in turn attempt to acquire the mutex that is already held here.
477 self.id_to_obj[ident] = (None, (), None) # thread-safe
478 util.debug('disposing of obj with id %r', ident)
479 with self.mutex:
480 del self.id_to_obj[ident]
481
Benjamin Petersone711caf2008-06-11 16:44:04 +0000482
483#
484# Class to represent state of a manager
485#
486
487class State(object):
488 __slots__ = ['value']
489 INITIAL = 0
490 STARTED = 1
491 SHUTDOWN = 2
492
493#
494# Mapping from serializer name to Listener and Client types
495#
496
497listener_client = {
498 'pickle' : (connection.Listener, connection.Client),
499 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
500 }
501
502#
503# Definition of BaseManager
504#
505
506class BaseManager(object):
507 '''
508 Base class for managers
509 '''
510 _registry = {}
511 _Server = Server
512
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100513 def __init__(self, address=None, authkey=None, serializer='pickle',
514 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000515 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100516 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000517 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100518 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000519 self._state = State()
520 self._state.value = State.INITIAL
521 self._serializer = serializer
522 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100523 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000524
Benjamin Petersone711caf2008-06-11 16:44:04 +0000525 def get_server(self):
526 '''
527 Return server object with serve_forever() method and address attribute
528 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500529 if self._state.value != State.INITIAL:
530 if self._state.value == State.STARTED:
531 raise ProcessError("Already started server")
532 elif self._state.value == State.SHUTDOWN:
533 raise ProcessError("Manager has shut down")
534 else:
535 raise ProcessError(
536 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000537 return Server(self._registry, self._address,
538 self._authkey, self._serializer)
539
540 def connect(self):
541 '''
542 Connect manager object to the server process
543 '''
544 Listener, Client = listener_client[self._serializer]
545 conn = Client(self._address, authkey=self._authkey)
546 dispatch(conn, None, 'dummy')
547 self._state.value = State.STARTED
548
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000549 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000550 '''
551 Spawn a server process for this manager object
552 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500553 if self._state.value != State.INITIAL:
554 if self._state.value == State.STARTED:
555 raise ProcessError("Already started server")
556 elif self._state.value == State.SHUTDOWN:
557 raise ProcessError("Manager has shut down")
558 else:
559 raise ProcessError(
560 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000561
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200562 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000563 raise TypeError('initializer must be a callable')
564
Benjamin Petersone711caf2008-06-11 16:44:04 +0000565 # pipe over which we will retrieve address of server
566 reader, writer = connection.Pipe(duplex=False)
567
568 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100569 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000570 target=type(self)._run_server,
571 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000572 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000573 )
574 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000575 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000576 self._process.start()
577
578 # get address of server
579 writer.close()
580 self._address = reader.recv()
581 reader.close()
582
583 # register a finalizer
584 self._state.value = State.STARTED
585 self.shutdown = util.Finalize(
586 self, type(self)._finalize_manager,
587 args=(self._process, self._address, self._authkey,
588 self._state, self._Client),
589 exitpriority=0
590 )
591
592 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000593 def _run_server(cls, registry, address, authkey, serializer, writer,
594 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000595 '''
596 Create a server, report its address and run it
597 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000598 if initializer is not None:
599 initializer(*initargs)
600
Benjamin Petersone711caf2008-06-11 16:44:04 +0000601 # create server
602 server = cls._Server(registry, address, authkey, serializer)
603
604 # inform parent process of the server's address
605 writer.send(server.address)
606 writer.close()
607
608 # run the manager
609 util.info('manager serving at %r', server.address)
610 server.serve_forever()
611
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300612 def _create(*args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000613 '''
614 Create a new shared object; return the token and exposed tuple
615 '''
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300616 self, typeid, *args = args
617 args = tuple(args)
618
Benjamin Petersone711caf2008-06-11 16:44:04 +0000619 assert self._state.value == State.STARTED, 'server not yet started'
620 conn = self._Client(self._address, authkey=self._authkey)
621 try:
622 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
623 finally:
624 conn.close()
625 return Token(typeid, self._address, id), exposed
626
627 def join(self, timeout=None):
628 '''
629 Join the manager process (if it has been spawned)
630 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100631 if self._process is not None:
632 self._process.join(timeout)
633 if not self._process.is_alive():
634 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000635
636 def _debug_info(self):
637 '''
638 Return some info about the servers shared objects and connections
639 '''
640 conn = self._Client(self._address, authkey=self._authkey)
641 try:
642 return dispatch(conn, None, 'debug_info')
643 finally:
644 conn.close()
645
646 def _number_of_objects(self):
647 '''
648 Return the number of shared objects
649 '''
650 conn = self._Client(self._address, authkey=self._authkey)
651 try:
652 return dispatch(conn, None, 'number_of_objects')
653 finally:
654 conn.close()
655
656 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100657 if self._state.value == State.INITIAL:
658 self.start()
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500659 if self._state.value != State.STARTED:
660 if self._state.value == State.INITIAL:
661 raise ProcessError("Unable to start server")
662 elif self._state.value == State.SHUTDOWN:
663 raise ProcessError("Manager has shut down")
664 else:
665 raise ProcessError(
666 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000667 return self
668
669 def __exit__(self, exc_type, exc_val, exc_tb):
670 self.shutdown()
671
672 @staticmethod
673 def _finalize_manager(process, address, authkey, state, _Client):
674 '''
675 Shutdown the manager process; will be registered as a finalizer
676 '''
677 if process.is_alive():
678 util.info('sending shutdown message to manager')
679 try:
680 conn = _Client(address, authkey=authkey)
681 try:
682 dispatch(conn, None, 'shutdown')
683 finally:
684 conn.close()
685 except Exception:
686 pass
687
Richard Oudkerk3049f122012-06-15 20:08:29 +0100688 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000689 if process.is_alive():
690 util.info('manager still alive')
691 if hasattr(process, 'terminate'):
692 util.info('trying to `terminate()` manager process')
693 process.terminate()
694 process.join(timeout=0.1)
695 if process.is_alive():
696 util.info('manager still alive after terminate')
697
698 state.value = State.SHUTDOWN
699 try:
700 del BaseProxy._address_to_local[address]
701 except KeyError:
702 pass
703
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200704 @property
705 def address(self):
706 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000707
708 @classmethod
709 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
710 method_to_typeid=None, create_method=True):
711 '''
712 Register a typeid with the manager type
713 '''
714 if '_registry' not in cls.__dict__:
715 cls._registry = cls._registry.copy()
716
717 if proxytype is None:
718 proxytype = AutoProxy
719
720 exposed = exposed or getattr(proxytype, '_exposed_', None)
721
722 method_to_typeid = method_to_typeid or \
723 getattr(proxytype, '_method_to_typeid_', None)
724
725 if method_to_typeid:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500726 for key, value in list(method_to_typeid.items()): # isinstance?
Benjamin Petersone711caf2008-06-11 16:44:04 +0000727 assert type(key) is str, '%r is not a string' % key
728 assert type(value) is str, '%r is not a string' % value
729
730 cls._registry[typeid] = (
731 callable, exposed, method_to_typeid, proxytype
732 )
733
734 if create_method:
735 def temp(self, *args, **kwds):
736 util.debug('requesting creation of a shared %r object', typeid)
737 token, exp = self._create(typeid, *args, **kwds)
738 proxy = proxytype(
739 token, self._serializer, manager=self,
740 authkey=self._authkey, exposed=exp
741 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000742 conn = self._Client(token.address, authkey=self._authkey)
743 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000744 return proxy
745 temp.__name__ = typeid
746 setattr(cls, typeid, temp)
747
748#
749# Subclass of set which get cleared after a fork
750#
751
752class ProcessLocalSet(set):
753 def __init__(self):
754 util.register_after_fork(self, lambda obj: obj.clear())
755 def __reduce__(self):
756 return type(self), ()
757
758#
759# Definition of BaseProxy
760#
761
762class BaseProxy(object):
763 '''
764 A base for proxies of shared objects
765 '''
766 _address_to_local = {}
767 _mutex = util.ForkAwareThreadLock()
768
769 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500770 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100771 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000772 tls_idset = BaseProxy._address_to_local.get(token.address, None)
773 if tls_idset is None:
774 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
775 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776
777 # self._tls is used to record the connection used by this
778 # thread to communicate with the manager at token.address
779 self._tls = tls_idset[0]
780
781 # self._idset is used to record the identities of all shared
782 # objects for which the current process owns references and
783 # which are in the manager at token.address
784 self._idset = tls_idset[1]
785
786 self._token = token
787 self._id = self._token.id
788 self._manager = manager
789 self._serializer = serializer
790 self._Client = listener_client[serializer][1]
791
Davin Potts86a76682016-09-07 18:48:01 -0500792 # Should be set to True only when a proxy object is being created
793 # on the manager server; primary use case: nested proxy objects.
794 # RebuildProxy detects when a proxy is being created on the manager
795 # and sets this value appropriately.
796 self._owned_by_manager = manager_owned
797
Benjamin Petersone711caf2008-06-11 16:44:04 +0000798 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100799 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000800 elif self._manager is not None:
801 self._authkey = self._manager._authkey
802 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100803 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804
805 if incref:
806 self._incref()
807
808 util.register_after_fork(self, BaseProxy._after_fork)
809
810 def _connect(self):
811 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100812 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000813 if threading.current_thread().name != 'MainThread':
814 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000815 conn = self._Client(self._token.address, authkey=self._authkey)
816 dispatch(conn, None, 'accept_connection', (name,))
817 self._tls.connection = conn
818
819 def _callmethod(self, methodname, args=(), kwds={}):
820 '''
821 Try to call a method of the referrent and return a copy of the result
822 '''
823 try:
824 conn = self._tls.connection
825 except AttributeError:
826 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000827 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000828 self._connect()
829 conn = self._tls.connection
830
831 conn.send((self._id, methodname, args, kwds))
832 kind, result = conn.recv()
833
834 if kind == '#RETURN':
835 return result
836 elif kind == '#PROXY':
837 exposed, token = result
838 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100839 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000840 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000841 token, self._serializer, manager=self._manager,
842 authkey=self._authkey, exposed=exposed
843 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000844 conn = self._Client(token.address, authkey=self._authkey)
845 dispatch(conn, None, 'decref', (token.id,))
846 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000847 raise convert_to_error(kind, result)
848
849 def _getvalue(self):
850 '''
851 Get a copy of the value of the referent
852 '''
853 return self._callmethod('#GETVALUE')
854
855 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500856 if self._owned_by_manager:
857 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
858 return
859
Benjamin Petersone711caf2008-06-11 16:44:04 +0000860 conn = self._Client(self._token.address, authkey=self._authkey)
861 dispatch(conn, None, 'incref', (self._id,))
862 util.debug('INCREF %r', self._token.id)
863
864 self._idset.add(self._id)
865
866 state = self._manager and self._manager._state
867
868 self._close = util.Finalize(
869 self, BaseProxy._decref,
870 args=(self._token, self._authkey, state,
871 self._tls, self._idset, self._Client),
872 exitpriority=10
873 )
874
875 @staticmethod
876 def _decref(token, authkey, state, tls, idset, _Client):
877 idset.discard(token.id)
878
879 # check whether manager is still alive
880 if state is None or state.value == State.STARTED:
881 # tell manager this process no longer cares about referent
882 try:
883 util.debug('DECREF %r', token.id)
884 conn = _Client(token.address, authkey=authkey)
885 dispatch(conn, None, 'decref', (token.id,))
886 except Exception as e:
887 util.debug('... decref failed %s', e)
888
889 else:
890 util.debug('DECREF %r -- manager already shutdown', token.id)
891
892 # check whether we can close this thread's connection because
893 # the process owns no more references to objects for this manager
894 if not idset and hasattr(tls, 'connection'):
895 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000896 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 tls.connection.close()
898 del tls.connection
899
900 def _after_fork(self):
901 self._manager = None
902 try:
903 self._incref()
904 except Exception as e:
905 # the proxy may just be for a manager which has shutdown
906 util.info('incref failed: %s' % e)
907
908 def __reduce__(self):
909 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500910 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000911 kwds['authkey'] = self._authkey
912
913 if getattr(self, '_isauto', False):
914 kwds['exposed'] = self._exposed_
915 return (RebuildProxy,
916 (AutoProxy, self._token, self._serializer, kwds))
917 else:
918 return (RebuildProxy,
919 (type(self), self._token, self._serializer, kwds))
920
921 def __deepcopy__(self, memo):
922 return self._getvalue()
923
924 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300925 return '<%s object, typeid %r at %#x>' % \
926 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000927
928 def __str__(self):
929 '''
930 Return representation of the referent (or a fall-back if that fails)
931 '''
932 try:
933 return self._callmethod('__repr__')
934 except Exception:
935 return repr(self)[:-1] + "; '__str__()' failed>"
936
937#
938# Function used for unpickling
939#
940
941def RebuildProxy(func, token, serializer, kwds):
942 '''
943 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000944 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100945 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500947 util.debug('Rebuild a proxy owned by manager, token=%r', token)
948 kwds['manager_owned'] = True
949 if token.id not in server.id_to_local_proxy_obj:
950 server.id_to_local_proxy_obj[token.id] = \
951 server.id_to_obj[token.id]
952 incref = (
953 kwds.pop('incref', True) and
954 not getattr(process.current_process(), '_inheriting', False)
955 )
956 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000957
958#
959# Functions to create proxies and proxy types
960#
961
962def MakeProxyType(name, exposed, _cache={}):
963 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300964 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000965 '''
966 exposed = tuple(exposed)
967 try:
968 return _cache[(name, exposed)]
969 except KeyError:
970 pass
971
972 dic = {}
973
974 for meth in exposed:
975 exec('''def %s(self, *args, **kwds):
976 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
977
978 ProxyType = type(name, (BaseProxy,), dic)
979 ProxyType._exposed_ = exposed
980 _cache[(name, exposed)] = ProxyType
981 return ProxyType
982
983
984def AutoProxy(token, serializer, manager=None, authkey=None,
985 exposed=None, incref=True):
986 '''
987 Return an auto-proxy for `token`
988 '''
989 _Client = listener_client[serializer][1]
990
991 if exposed is None:
992 conn = _Client(token.address, authkey=authkey)
993 try:
994 exposed = dispatch(conn, None, 'get_methods', (token,))
995 finally:
996 conn.close()
997
998 if authkey is None and manager is not None:
999 authkey = manager._authkey
1000 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001001 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +00001002
1003 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
1004 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
1005 incref=incref)
1006 proxy._isauto = True
1007 return proxy
1008
1009#
1010# Types/callables which we will register with SyncManager
1011#
1012
1013class Namespace(object):
1014 def __init__(self, **kwds):
1015 self.__dict__.update(kwds)
1016 def __repr__(self):
1017 items = list(self.__dict__.items())
1018 temp = []
1019 for name, value in items:
1020 if not name.startswith('_'):
1021 temp.append('%s=%r' % (name, value))
1022 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +03001023 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001024
1025class Value(object):
1026 def __init__(self, typecode, value, lock=True):
1027 self._typecode = typecode
1028 self._value = value
1029 def get(self):
1030 return self._value
1031 def set(self, value):
1032 self._value = value
1033 def __repr__(self):
1034 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1035 value = property(get, set)
1036
1037def Array(typecode, sequence, lock=True):
1038 return array.array(typecode, sequence)
1039
1040#
1041# Proxy types used by SyncManager
1042#
1043
1044class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +00001045 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001046 def __iter__(self):
1047 return self
1048 def __next__(self, *args):
1049 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050 def send(self, *args):
1051 return self._callmethod('send', args)
1052 def throw(self, *args):
1053 return self._callmethod('throw', args)
1054 def close(self, *args):
1055 return self._callmethod('close', args)
1056
1057
1058class AcquirerProxy(BaseProxy):
1059 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +01001060 def acquire(self, blocking=True, timeout=None):
1061 args = (blocking,) if timeout is None else (blocking, timeout)
1062 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001063 def release(self):
1064 return self._callmethod('release')
1065 def __enter__(self):
1066 return self._callmethod('acquire')
1067 def __exit__(self, exc_type, exc_val, exc_tb):
1068 return self._callmethod('release')
1069
1070
1071class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001072 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001073 def wait(self, timeout=None):
1074 return self._callmethod('wait', (timeout,))
Antoine Pitrou48350412017-07-04 08:59:22 +02001075 def notify(self, n=1):
1076 return self._callmethod('notify', (n,))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001077 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001078 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001079 def wait_for(self, predicate, timeout=None):
1080 result = predicate()
1081 if result:
1082 return result
1083 if timeout is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001084 endtime = time.monotonic() + timeout
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001085 else:
1086 endtime = None
1087 waittime = None
1088 while not result:
1089 if endtime is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001090 waittime = endtime - time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001091 if waittime <= 0:
1092 break
1093 self.wait(waittime)
1094 result = predicate()
1095 return result
1096
Benjamin Petersone711caf2008-06-11 16:44:04 +00001097
1098class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001099 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001100 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001101 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001102 def set(self):
1103 return self._callmethod('set')
1104 def clear(self):
1105 return self._callmethod('clear')
1106 def wait(self, timeout=None):
1107 return self._callmethod('wait', (timeout,))
1108
Richard Oudkerk3730a172012-06-15 18:26:07 +01001109
1110class BarrierProxy(BaseProxy):
1111 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1112 def wait(self, timeout=None):
1113 return self._callmethod('wait', (timeout,))
1114 def abort(self):
1115 return self._callmethod('abort')
1116 def reset(self):
1117 return self._callmethod('reset')
1118 @property
1119 def parties(self):
1120 return self._callmethod('__getattribute__', ('parties',))
1121 @property
1122 def n_waiting(self):
1123 return self._callmethod('__getattribute__', ('n_waiting',))
1124 @property
1125 def broken(self):
1126 return self._callmethod('__getattribute__', ('broken',))
1127
1128
Benjamin Petersone711caf2008-06-11 16:44:04 +00001129class NamespaceProxy(BaseProxy):
1130 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1131 def __getattr__(self, key):
1132 if key[0] == '_':
1133 return object.__getattribute__(self, key)
1134 callmethod = object.__getattribute__(self, '_callmethod')
1135 return callmethod('__getattribute__', (key,))
1136 def __setattr__(self, key, value):
1137 if key[0] == '_':
1138 return object.__setattr__(self, key, value)
1139 callmethod = object.__getattribute__(self, '_callmethod')
1140 return callmethod('__setattr__', (key, value))
1141 def __delattr__(self, key):
1142 if key[0] == '_':
1143 return object.__delattr__(self, key)
1144 callmethod = object.__getattribute__(self, '_callmethod')
1145 return callmethod('__delattr__', (key,))
1146
1147
1148class ValueProxy(BaseProxy):
1149 _exposed_ = ('get', 'set')
1150 def get(self):
1151 return self._callmethod('get')
1152 def set(self, value):
1153 return self._callmethod('set', (value,))
1154 value = property(get, set)
1155
1156
1157BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001158 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1159 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001160 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1161 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001162 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001163class ListProxy(BaseListProxy):
1164 def __iadd__(self, value):
1165 self._callmethod('extend', (value,))
1166 return self
1167 def __imul__(self, value):
1168 self._callmethod('__imul__', (value,))
1169 return self
1170
1171
1172DictProxy = MakeProxyType('DictProxy', (
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001173 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
Rémi Lapeyrea31f4cc2019-02-12 01:37:24 +01001174 '__setitem__', 'clear', 'copy', 'get', 'items',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001175 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1176 ))
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001177DictProxy._method_to_typeid_ = {
1178 '__iter__': 'Iterator',
1179 }
Benjamin Petersone711caf2008-06-11 16:44:04 +00001180
1181
1182ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001183 '__len__', '__getitem__', '__setitem__'
1184 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001185
1186
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001187BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001188 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001189 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001190 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001191BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001192 'apply_async': 'AsyncResult',
1193 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001194 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001195 'imap': 'Iterator',
1196 'imap_unordered': 'Iterator'
1197 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001198class PoolProxy(BasePoolProxy):
1199 def __enter__(self):
1200 return self
1201 def __exit__(self, exc_type, exc_val, exc_tb):
1202 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001203
1204#
1205# Definition of SyncManager
1206#
1207
1208class SyncManager(BaseManager):
1209 '''
1210 Subclass of `BaseManager` which supports a number of shared object types.
1211
1212 The types registered are those intended for the synchronization
1213 of threads, plus `dict`, `list` and `Namespace`.
1214
1215 The `multiprocessing.Manager()` function creates started instances of
1216 this class.
1217 '''
1218
1219SyncManager.register('Queue', queue.Queue)
1220SyncManager.register('JoinableQueue', queue.Queue)
1221SyncManager.register('Event', threading.Event, EventProxy)
1222SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1223SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1224SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1225SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1226 AcquirerProxy)
1227SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001228SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001229SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001230SyncManager.register('list', list, ListProxy)
1231SyncManager.register('dict', dict, DictProxy)
1232SyncManager.register('Value', Value, ValueProxy)
1233SyncManager.register('Array', Array, ArrayProxy)
1234SyncManager.register('Namespace', Namespace, NamespaceProxy)
1235
1236# types returned by methods of PoolProxy
1237SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1238SyncManager.register('AsyncResult', create_method=False)
Davin Pottse895de32019-02-23 22:08:16 -06001239
1240#
1241# Definition of SharedMemoryManager and SharedMemoryServer
1242#
1243
1244if HAS_SHMEM:
1245 class _SharedMemoryTracker:
1246 "Manages one or more shared memory segments."
1247
1248 def __init__(self, name, segment_names=[]):
1249 self.shared_memory_context_name = name
1250 self.segment_names = segment_names
1251
1252 def register_segment(self, segment_name):
1253 "Adds the supplied shared memory block name to tracker."
1254 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1255 self.segment_names.append(segment_name)
1256
1257 def destroy_segment(self, segment_name):
1258 """Calls unlink() on the shared memory block with the supplied name
1259 and removes it from the list of blocks being tracked."""
1260 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1261 self.segment_names.remove(segment_name)
1262 segment = shared_memory.SharedMemory(segment_name)
1263 segment.close()
1264 segment.unlink()
1265
1266 def unlink(self):
1267 "Calls destroy_segment() on all tracked shared memory blocks."
1268 for segment_name in self.segment_names[:]:
1269 self.destroy_segment(segment_name)
1270
1271 def __del__(self):
1272 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1273 self.unlink()
1274
1275 def __getstate__(self):
1276 return (self.shared_memory_context_name, self.segment_names)
1277
1278 def __setstate__(self, state):
1279 self.__init__(*state)
1280
1281
1282 class SharedMemoryServer(Server):
1283
1284 public = Server.public + \
1285 ['track_segment', 'release_segment', 'list_segments']
1286
1287 def __init__(self, *args, **kwargs):
1288 Server.__init__(self, *args, **kwargs)
1289 self.shared_memory_context = \
1290 _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
1291 util.debug(f"SharedMemoryServer started by pid {getpid()}")
1292
Serhiy Storchaka42a139e2019-04-01 09:16:35 +03001293 def create(*args, **kwargs):
Davin Pottse895de32019-02-23 22:08:16 -06001294 """Create a new distributed-shared object (not backed by a shared
1295 memory block) and return its id to be used in a Proxy Object."""
1296 # Unless set up as a shared proxy, don't make shared_memory_context
1297 # a standard part of kwargs. This makes things easier for supplying
1298 # simple functions.
Serhiy Storchaka42a139e2019-04-01 09:16:35 +03001299 if len(args) >= 3:
1300 typeod = args[2]
1301 elif 'typeid' in kwargs:
1302 typeid = kwargs['typeid']
1303 elif not args:
1304 raise TypeError("descriptor 'create' of 'SharedMemoryServer' "
1305 "object needs an argument")
1306 else:
1307 raise TypeError('create expected at least 2 positional '
1308 'arguments, got %d' % (len(args)-1))
Davin Pottse895de32019-02-23 22:08:16 -06001309 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1310 kwargs['shared_memory_context'] = self.shared_memory_context
Serhiy Storchaka42a139e2019-04-01 09:16:35 +03001311 return Server.create(*args, **kwargs)
Davin Pottse895de32019-02-23 22:08:16 -06001312
1313 def shutdown(self, c):
1314 "Call unlink() on all tracked shared memory, terminate the Server."
1315 self.shared_memory_context.unlink()
1316 return Server.shutdown(self, c)
1317
1318 def track_segment(self, c, segment_name):
1319 "Adds the supplied shared memory block name to Server's tracker."
1320 self.shared_memory_context.register_segment(segment_name)
1321
1322 def release_segment(self, c, segment_name):
1323 """Calls unlink() on the shared memory block with the supplied name
1324 and removes it from the tracker instance inside the Server."""
1325 self.shared_memory_context.destroy_segment(segment_name)
1326
1327 def list_segments(self, c):
1328 """Returns a list of names of shared memory blocks that the Server
1329 is currently tracking."""
1330 return self.shared_memory_context.segment_names
1331
1332
1333 class SharedMemoryManager(BaseManager):
1334 """Like SyncManager but uses SharedMemoryServer instead of Server.
1335
1336 It provides methods for creating and returning SharedMemory instances
1337 and for creating a list-like object (ShareableList) backed by shared
1338 memory. It also provides methods that create and return Proxy Objects
1339 that support synchronization across processes (i.e. multi-process-safe
1340 locks and semaphores).
1341 """
1342
1343 _Server = SharedMemoryServer
1344
1345 def __init__(self, *args, **kwargs):
1346 BaseManager.__init__(self, *args, **kwargs)
1347 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1348
1349 def __del__(self):
1350 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1351 pass
1352
1353 def get_server(self):
1354 'Better than monkeypatching for now; merge into Server ultimately'
1355 if self._state.value != State.INITIAL:
1356 if self._state.value == State.STARTED:
1357 raise ProcessError("Already started SharedMemoryServer")
1358 elif self._state.value == State.SHUTDOWN:
1359 raise ProcessError("SharedMemoryManager has shut down")
1360 else:
1361 raise ProcessError(
1362 "Unknown state {!r}".format(self._state.value))
1363 return self._Server(self._registry, self._address,
1364 self._authkey, self._serializer)
1365
1366 def SharedMemory(self, size):
1367 """Returns a new SharedMemory instance with the specified size in
1368 bytes, to be tracked by the manager."""
1369 with self._Client(self._address, authkey=self._authkey) as conn:
1370 sms = shared_memory.SharedMemory(None, create=True, size=size)
1371 try:
1372 dispatch(conn, None, 'track_segment', (sms.name,))
1373 except BaseException as e:
1374 sms.unlink()
1375 raise e
1376 return sms
1377
1378 def ShareableList(self, sequence):
1379 """Returns a new ShareableList instance populated with the values
1380 from the input sequence, to be tracked by the manager."""
1381 with self._Client(self._address, authkey=self._authkey) as conn:
1382 sl = shared_memory.ShareableList(sequence)
1383 try:
1384 dispatch(conn, None, 'track_segment', (sl.shm.name,))
1385 except BaseException as e:
1386 sl.shm.unlink()
1387 raise e
1388 return sl