blob: 7e1818bb0996857a7efedd1b673249c6c2e035e7 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
Davin Pottse895de32019-02-23 22:08:16 -06002# Module providing manager classes for dealing
Benjamin Petersone711caf2008-06-11 16:44:04 +00003# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
Davin Pottse895de32019-02-23 22:08:16 -060011__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
12 'SharedMemoryManager' ]
Benjamin Petersone711caf2008-06-11 16:44:04 +000013
14#
15# Imports
16#
17
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000019import threading
Pierre Glaserd0d64ad2019-05-10 20:42:35 +020020import signal
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000022import queue
Victor Stinnerc2368cb2018-07-06 13:51:52 +020023import time
Pierre Glaserb1dfcad2019-05-13 21:15:32 +020024import os
Davin Pottse895de32019-02-23 22:08:16 -060025from os import getpid
Benjamin Petersone711caf2008-06-11 16:44:04 +000026
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from traceback import format_exc
28
29from . import connection
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -050030from .context import reduction, get_spawning_popen, ProcessError
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010031from . import pool
32from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010033from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010034from . import get_context
Davin Pottse895de32019-02-23 22:08:16 -060035try:
36 from . import shared_memory
37 HAS_SHMEM = True
38except ImportError:
39 HAS_SHMEM = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000040
Benjamin Petersone711caf2008-06-11 16:44:04 +000041#
Benjamin Petersone711caf2008-06-11 16:44:04 +000042# Register some things for pickling
43#
44
45def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000046 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010047reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
49view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000050if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000051 def rebuild_as_list(obj):
52 return list, (list(obj),)
53 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010054 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000055
56#
57# Type for identifying shared objects
58#
59
60class Token(object):
61 '''
62 Type to uniquely indentify a shared object
63 '''
64 __slots__ = ('typeid', 'address', 'id')
65
66 def __init__(self, typeid, address, id):
67 (self.typeid, self.address, self.id) = (typeid, address, id)
68
69 def __getstate__(self):
70 return (self.typeid, self.address, self.id)
71
72 def __setstate__(self, state):
73 (self.typeid, self.address, self.id) = state
74
75 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030076 return '%s(typeid=%r, address=%r, id=%r)' % \
77 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000078
79#
80# Function for communication with a manager's server process
81#
82
83def dispatch(c, id, methodname, args=(), kwds={}):
84 '''
85 Send a message to manager using connection `c` and return response
86 '''
87 c.send((id, methodname, args, kwds))
88 kind, result = c.recv()
89 if kind == '#RETURN':
90 return result
91 raise convert_to_error(kind, result)
92
93def convert_to_error(kind, result):
94 if kind == '#ERROR':
95 return result
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050096 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
97 if not isinstance(result, str):
98 raise TypeError(
99 "Result {0!r} (kind '{1}') type is {2}, not str".format(
100 result, kind, type(result)))
101 if kind == '#UNSERIALIZABLE':
102 return RemoteError('Unserializable message: %s\n' % result)
103 else:
104 return RemoteError(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000105 else:
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -0500106 return ValueError('Unrecognized message type {!r}'.format(kind))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000107
108class RemoteError(Exception):
109 def __str__(self):
110 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
111
112#
113# Functions for finding the method names of an object
114#
115
116def all_methods(obj):
117 '''
118 Return a list of names of methods of `obj`
119 '''
120 temp = []
121 for name in dir(obj):
122 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200123 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000124 temp.append(name)
125 return temp
126
127def public_methods(obj):
128 '''
129 Return a list of names of methods of `obj` which do not start with '_'
130 '''
131 return [name for name in all_methods(obj) if name[0] != '_']
132
133#
134# Server which is run in a process controlled by a manager
135#
136
137class Server(object):
138 '''
139 Server class which runs in a process controlled by a manager object
140 '''
141 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
142 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
143
144 def __init__(self, registry, address, authkey, serializer):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500145 if not isinstance(authkey, bytes):
146 raise TypeError(
147 "Authkey {0!r} is type {1!s}, not bytes".format(
148 authkey, type(authkey)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100150 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151 Listener, Client = listener_client[serializer]
152
153 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100154 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155 self.address = self.listener.address
156
Jesse Noller63b3a972009-01-21 02:15:48 +0000157 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500159 self.id_to_local_proxy_obj = {}
160 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161
162 def serve_forever(self):
163 '''
164 Run the server forever
165 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100166 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100167 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000168 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100169 accepter = threading.Thread(target=self.accepter)
170 accepter.daemon = True
171 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100173 while not self.stop_event.is_set():
174 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175 except (KeyboardInterrupt, SystemExit):
176 pass
177 finally:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500178 if sys.stdout != sys.__stdout__: # what about stderr?
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100179 util.debug('resetting stdout, stderr')
180 sys.stdout = sys.__stdout__
181 sys.stderr = sys.__stderr__
182 sys.exit(0)
183
184 def accepter(self):
185 while True:
186 try:
187 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200188 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100189 continue
190 t = threading.Thread(target=self.handle_request, args=(c,))
191 t.daemon = True
192 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193
194 def handle_request(self, c):
195 '''
196 Handle a new connection
197 '''
198 funcname = result = request = None
199 try:
200 connection.deliver_challenge(c, self.authkey)
201 connection.answer_challenge(c, self.authkey)
202 request = c.recv()
203 ignore, funcname, args, kwds = request
204 assert funcname in self.public, '%r unrecognized' % funcname
205 func = getattr(self, funcname)
206 except Exception:
207 msg = ('#TRACEBACK', format_exc())
208 else:
209 try:
210 result = func(c, *args, **kwds)
211 except Exception:
212 msg = ('#TRACEBACK', format_exc())
213 else:
214 msg = ('#RETURN', result)
215 try:
216 c.send(msg)
217 except Exception as e:
218 try:
219 c.send(('#TRACEBACK', format_exc()))
220 except Exception:
221 pass
222 util.info('Failure to send message: %r', msg)
223 util.info(' ... request was %r', request)
224 util.info(' ... exception was %r', e)
225
226 c.close()
227
228 def serve_client(self, conn):
229 '''
230 Handle requests from the proxies in a particular process/thread
231 '''
232 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000233 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234
235 recv = conn.recv
236 send = conn.send
237 id_to_obj = self.id_to_obj
238
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100239 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240
241 try:
242 methodname = obj = None
243 request = recv()
244 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500245 try:
246 obj, exposed, gettypeid = id_to_obj[ident]
247 except KeyError as ke:
248 try:
249 obj, exposed, gettypeid = \
250 self.id_to_local_proxy_obj[ident]
251 except KeyError as second_ke:
252 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000253
254 if methodname not in exposed:
255 raise AttributeError(
256 'method %r of %r object is not in exposed=%r' %
257 (methodname, type(obj), exposed)
258 )
259
260 function = getattr(obj, methodname)
261
262 try:
263 res = function(*args, **kwds)
264 except Exception as e:
265 msg = ('#ERROR', e)
266 else:
267 typeid = gettypeid and gettypeid.get(methodname, None)
268 if typeid:
269 rident, rexposed = self.create(conn, typeid, res)
270 token = Token(typeid, self.address, rident)
271 msg = ('#PROXY', (rexposed, token))
272 else:
273 msg = ('#RETURN', res)
274
275 except AttributeError:
276 if methodname is None:
277 msg = ('#TRACEBACK', format_exc())
278 else:
279 try:
280 fallback_func = self.fallback_mapping[methodname]
281 result = fallback_func(
282 self, conn, ident, obj, *args, **kwds
283 )
284 msg = ('#RETURN', result)
285 except Exception:
286 msg = ('#TRACEBACK', format_exc())
287
288 except EOFError:
289 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000290 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291 sys.exit(0)
292
293 except Exception:
294 msg = ('#TRACEBACK', format_exc())
295
296 try:
297 try:
298 send(msg)
299 except Exception as e:
Davin Potts37156a72016-09-08 14:40:36 -0500300 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 except Exception as e:
302 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000303 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304 util.info(' ... message was %r', msg)
305 util.info(' ... exception was %r', e)
306 conn.close()
307 sys.exit(1)
308
309 def fallback_getvalue(self, conn, ident, obj):
310 return obj
311
312 def fallback_str(self, conn, ident, obj):
313 return str(obj)
314
315 def fallback_repr(self, conn, ident, obj):
316 return repr(obj)
317
318 fallback_mapping = {
319 '__str__':fallback_str,
320 '__repr__':fallback_repr,
321 '#GETVALUE':fallback_getvalue
322 }
323
324 def dummy(self, c):
325 pass
326
327 def debug_info(self, c):
328 '''
329 Return some info --- useful to spot problems with refcounting
330 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500331 # Perhaps include debug info about 'c'?
Charles-François Natalia924fc72014-05-25 14:12:12 +0100332 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500334 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000335 keys.sort()
336 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000337 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000338 result.append(' %s: refcount=%s\n %s' %
339 (ident, self.id_to_refcount[ident],
340 str(self.id_to_obj[ident][0])[:75]))
341 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000342
343 def number_of_objects(self, c):
344 '''
345 Number of shared objects
346 '''
Davin Potts86a76682016-09-07 18:48:01 -0500347 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
348 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000349
350 def shutdown(self, c):
351 '''
352 Shutdown this process
353 '''
354 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100355 util.debug('manager received shutdown message')
356 c.send(('#RETURN', None))
357 except:
358 import traceback
359 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000360 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100361 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000362
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300363 def create(*args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000364 '''
365 Create a new shared object and return its id
366 '''
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300367 if len(args) >= 3:
368 self, c, typeid, *args = args
369 elif not args:
370 raise TypeError("descriptor 'create' of 'Server' object "
371 "needs an argument")
372 else:
373 if 'typeid' not in kwds:
374 raise TypeError('create expected at least 2 positional '
375 'arguments, got %d' % (len(args)-1))
376 typeid = kwds.pop('typeid')
377 if len(args) >= 2:
378 self, c, *args = args
379 import warnings
380 warnings.warn("Passing 'typeid' as keyword argument is deprecated",
381 DeprecationWarning, stacklevel=2)
382 else:
383 if 'c' not in kwds:
384 raise TypeError('create expected at least 2 positional '
385 'arguments, got %d' % (len(args)-1))
386 c = kwds.pop('c')
387 self, *args = args
388 import warnings
389 warnings.warn("Passing 'c' as keyword argument is deprecated",
390 DeprecationWarning, stacklevel=2)
391 args = tuple(args)
392
Charles-François Natalia924fc72014-05-25 14:12:12 +0100393 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000394 callable, exposed, method_to_typeid, proxytype = \
395 self.registry[typeid]
396
397 if callable is None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500398 if kwds or (len(args) != 1):
399 raise ValueError(
400 "Without callable, must have one non-keyword argument")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000401 obj = args[0]
402 else:
403 obj = callable(*args, **kwds)
404
405 if exposed is None:
406 exposed = public_methods(obj)
407 if method_to_typeid is not None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500408 if not isinstance(method_to_typeid, dict):
409 raise TypeError(
410 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
411 method_to_typeid, type(method_to_typeid)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000412 exposed = list(exposed) + list(method_to_typeid)
413
414 ident = '%x' % id(obj) # convert to string because xmlrpclib
415 # only has 32 bit signed integers
416 util.debug('%r callable returned object with id %r', typeid, ident)
417
418 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
419 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000420 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500421
422 self.incref(c, ident)
423 return ident, tuple(exposed)
Serhiy Storchakad53cf992019-05-06 22:40:27 +0300424 create.__text_signature__ = '($self, c, typeid, /, *args, **kwds)'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425
426 def get_methods(self, c, token):
427 '''
428 Return the methods of the shared object indicated by token
429 '''
430 return tuple(self.id_to_obj[token.id][1])
431
432 def accept_connection(self, c, name):
433 '''
434 Spawn a new thread to serve this connection
435 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000436 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000437 c.send(('#RETURN', None))
438 self.serve_client(c)
439
440 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100441 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500442 try:
443 self.id_to_refcount[ident] += 1
444 except KeyError as ke:
445 # If no external references exist but an internal (to the
446 # manager) still does and a new external reference is created
447 # from it, restore the manager's tracking of it from the
448 # previously stashed internal ref.
449 if ident in self.id_to_local_proxy_obj:
450 self.id_to_refcount[ident] = 1
451 self.id_to_obj[ident] = \
452 self.id_to_local_proxy_obj[ident]
453 obj, exposed, gettypeid = self.id_to_obj[ident]
454 util.debug('Server re-enabled tracking & INCREF %r', ident)
455 else:
456 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000457
458 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500459 if ident not in self.id_to_refcount and \
460 ident in self.id_to_local_proxy_obj:
461 util.debug('Server DECREF skipping %r', ident)
462 return
463
Charles-François Natalia924fc72014-05-25 14:12:12 +0100464 with self.mutex:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500465 if self.id_to_refcount[ident] <= 0:
466 raise AssertionError(
467 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
468 ident, self.id_to_obj[ident],
469 self.id_to_refcount[ident]))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000470 self.id_to_refcount[ident] -= 1
471 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500472 del self.id_to_refcount[ident]
473
474 if ident not in self.id_to_refcount:
475 # Two-step process in case the object turns out to contain other
476 # proxy objects (e.g. a managed list of managed lists).
477 # Otherwise, deleting self.id_to_obj[ident] would trigger the
478 # deleting of the stored value (another managed object) which would
479 # in turn attempt to acquire the mutex that is already held here.
480 self.id_to_obj[ident] = (None, (), None) # thread-safe
481 util.debug('disposing of obj with id %r', ident)
482 with self.mutex:
483 del self.id_to_obj[ident]
484
Benjamin Petersone711caf2008-06-11 16:44:04 +0000485
486#
487# Class to represent state of a manager
488#
489
490class State(object):
491 __slots__ = ['value']
492 INITIAL = 0
493 STARTED = 1
494 SHUTDOWN = 2
495
496#
497# Mapping from serializer name to Listener and Client types
498#
499
500listener_client = {
501 'pickle' : (connection.Listener, connection.Client),
502 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
503 }
504
505#
506# Definition of BaseManager
507#
508
509class BaseManager(object):
510 '''
511 Base class for managers
512 '''
513 _registry = {}
514 _Server = Server
515
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100516 def __init__(self, address=None, authkey=None, serializer='pickle',
517 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000518 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100519 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100521 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000522 self._state = State()
523 self._state.value = State.INITIAL
524 self._serializer = serializer
525 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100526 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000527
Benjamin Petersone711caf2008-06-11 16:44:04 +0000528 def get_server(self):
529 '''
530 Return server object with serve_forever() method and address attribute
531 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500532 if self._state.value != State.INITIAL:
533 if self._state.value == State.STARTED:
534 raise ProcessError("Already started server")
535 elif self._state.value == State.SHUTDOWN:
536 raise ProcessError("Manager has shut down")
537 else:
538 raise ProcessError(
539 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000540 return Server(self._registry, self._address,
541 self._authkey, self._serializer)
542
543 def connect(self):
544 '''
545 Connect manager object to the server process
546 '''
547 Listener, Client = listener_client[self._serializer]
548 conn = Client(self._address, authkey=self._authkey)
549 dispatch(conn, None, 'dummy')
550 self._state.value = State.STARTED
551
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000552 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000553 '''
554 Spawn a server process for this manager object
555 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500556 if self._state.value != State.INITIAL:
557 if self._state.value == State.STARTED:
558 raise ProcessError("Already started server")
559 elif self._state.value == State.SHUTDOWN:
560 raise ProcessError("Manager has shut down")
561 else:
562 raise ProcessError(
563 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000564
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200565 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000566 raise TypeError('initializer must be a callable')
567
Benjamin Petersone711caf2008-06-11 16:44:04 +0000568 # pipe over which we will retrieve address of server
569 reader, writer = connection.Pipe(duplex=False)
570
571 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100572 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000573 target=type(self)._run_server,
574 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000575 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000576 )
577 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000578 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000579 self._process.start()
580
581 # get address of server
582 writer.close()
583 self._address = reader.recv()
584 reader.close()
585
586 # register a finalizer
587 self._state.value = State.STARTED
588 self.shutdown = util.Finalize(
589 self, type(self)._finalize_manager,
590 args=(self._process, self._address, self._authkey,
591 self._state, self._Client),
592 exitpriority=0
593 )
594
595 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000596 def _run_server(cls, registry, address, authkey, serializer, writer,
597 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000598 '''
599 Create a server, report its address and run it
600 '''
Pierre Glaserd0d64ad2019-05-10 20:42:35 +0200601 # bpo-36368: protect server process from KeyboardInterrupt signals
602 signal.signal(signal.SIGINT, signal.SIG_IGN)
603
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000604 if initializer is not None:
605 initializer(*initargs)
606
Benjamin Petersone711caf2008-06-11 16:44:04 +0000607 # create server
608 server = cls._Server(registry, address, authkey, serializer)
609
610 # inform parent process of the server's address
611 writer.send(server.address)
612 writer.close()
613
614 # run the manager
615 util.info('manager serving at %r', server.address)
616 server.serve_forever()
617
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300618 def _create(self, typeid, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000619 '''
620 Create a new shared object; return the token and exposed tuple
621 '''
622 assert self._state.value == State.STARTED, 'server not yet started'
623 conn = self._Client(self._address, authkey=self._authkey)
624 try:
625 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
626 finally:
627 conn.close()
628 return Token(typeid, self._address, id), exposed
629
630 def join(self, timeout=None):
631 '''
632 Join the manager process (if it has been spawned)
633 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100634 if self._process is not None:
635 self._process.join(timeout)
636 if not self._process.is_alive():
637 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000638
639 def _debug_info(self):
640 '''
641 Return some info about the servers shared objects and connections
642 '''
643 conn = self._Client(self._address, authkey=self._authkey)
644 try:
645 return dispatch(conn, None, 'debug_info')
646 finally:
647 conn.close()
648
649 def _number_of_objects(self):
650 '''
651 Return the number of shared objects
652 '''
653 conn = self._Client(self._address, authkey=self._authkey)
654 try:
655 return dispatch(conn, None, 'number_of_objects')
656 finally:
657 conn.close()
658
659 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100660 if self._state.value == State.INITIAL:
661 self.start()
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500662 if self._state.value != State.STARTED:
663 if self._state.value == State.INITIAL:
664 raise ProcessError("Unable to start server")
665 elif self._state.value == State.SHUTDOWN:
666 raise ProcessError("Manager has shut down")
667 else:
668 raise ProcessError(
669 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000670 return self
671
672 def __exit__(self, exc_type, exc_val, exc_tb):
673 self.shutdown()
674
675 @staticmethod
676 def _finalize_manager(process, address, authkey, state, _Client):
677 '''
678 Shutdown the manager process; will be registered as a finalizer
679 '''
680 if process.is_alive():
681 util.info('sending shutdown message to manager')
682 try:
683 conn = _Client(address, authkey=authkey)
684 try:
685 dispatch(conn, None, 'shutdown')
686 finally:
687 conn.close()
688 except Exception:
689 pass
690
Richard Oudkerk3049f122012-06-15 20:08:29 +0100691 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000692 if process.is_alive():
693 util.info('manager still alive')
694 if hasattr(process, 'terminate'):
695 util.info('trying to `terminate()` manager process')
696 process.terminate()
697 process.join(timeout=0.1)
698 if process.is_alive():
699 util.info('manager still alive after terminate')
700
701 state.value = State.SHUTDOWN
702 try:
703 del BaseProxy._address_to_local[address]
704 except KeyError:
705 pass
706
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200707 @property
708 def address(self):
709 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000710
711 @classmethod
712 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
713 method_to_typeid=None, create_method=True):
714 '''
715 Register a typeid with the manager type
716 '''
717 if '_registry' not in cls.__dict__:
718 cls._registry = cls._registry.copy()
719
720 if proxytype is None:
721 proxytype = AutoProxy
722
723 exposed = exposed or getattr(proxytype, '_exposed_', None)
724
725 method_to_typeid = method_to_typeid or \
726 getattr(proxytype, '_method_to_typeid_', None)
727
728 if method_to_typeid:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500729 for key, value in list(method_to_typeid.items()): # isinstance?
Benjamin Petersone711caf2008-06-11 16:44:04 +0000730 assert type(key) is str, '%r is not a string' % key
731 assert type(value) is str, '%r is not a string' % value
732
733 cls._registry[typeid] = (
734 callable, exposed, method_to_typeid, proxytype
735 )
736
737 if create_method:
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300738 def temp(self, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000739 util.debug('requesting creation of a shared %r object', typeid)
740 token, exp = self._create(typeid, *args, **kwds)
741 proxy = proxytype(
742 token, self._serializer, manager=self,
743 authkey=self._authkey, exposed=exp
744 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000745 conn = self._Client(token.address, authkey=self._authkey)
746 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000747 return proxy
748 temp.__name__ = typeid
749 setattr(cls, typeid, temp)
750
751#
752# Subclass of set which get cleared after a fork
753#
754
755class ProcessLocalSet(set):
756 def __init__(self):
757 util.register_after_fork(self, lambda obj: obj.clear())
758 def __reduce__(self):
759 return type(self), ()
760
761#
762# Definition of BaseProxy
763#
764
765class BaseProxy(object):
766 '''
767 A base for proxies of shared objects
768 '''
769 _address_to_local = {}
770 _mutex = util.ForkAwareThreadLock()
771
772 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500773 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100774 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000775 tls_idset = BaseProxy._address_to_local.get(token.address, None)
776 if tls_idset is None:
777 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
778 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779
780 # self._tls is used to record the connection used by this
781 # thread to communicate with the manager at token.address
782 self._tls = tls_idset[0]
783
784 # self._idset is used to record the identities of all shared
785 # objects for which the current process owns references and
786 # which are in the manager at token.address
787 self._idset = tls_idset[1]
788
789 self._token = token
790 self._id = self._token.id
791 self._manager = manager
792 self._serializer = serializer
793 self._Client = listener_client[serializer][1]
794
Davin Potts86a76682016-09-07 18:48:01 -0500795 # Should be set to True only when a proxy object is being created
796 # on the manager server; primary use case: nested proxy objects.
797 # RebuildProxy detects when a proxy is being created on the manager
798 # and sets this value appropriately.
799 self._owned_by_manager = manager_owned
800
Benjamin Petersone711caf2008-06-11 16:44:04 +0000801 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100802 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000803 elif self._manager is not None:
804 self._authkey = self._manager._authkey
805 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100806 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000807
808 if incref:
809 self._incref()
810
811 util.register_after_fork(self, BaseProxy._after_fork)
812
813 def _connect(self):
814 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100815 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000816 if threading.current_thread().name != 'MainThread':
817 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000818 conn = self._Client(self._token.address, authkey=self._authkey)
819 dispatch(conn, None, 'accept_connection', (name,))
820 self._tls.connection = conn
821
822 def _callmethod(self, methodname, args=(), kwds={}):
823 '''
824 Try to call a method of the referrent and return a copy of the result
825 '''
826 try:
827 conn = self._tls.connection
828 except AttributeError:
829 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000830 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 self._connect()
832 conn = self._tls.connection
833
834 conn.send((self._id, methodname, args, kwds))
835 kind, result = conn.recv()
836
837 if kind == '#RETURN':
838 return result
839 elif kind == '#PROXY':
840 exposed, token = result
841 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100842 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000843 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000844 token, self._serializer, manager=self._manager,
845 authkey=self._authkey, exposed=exposed
846 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000847 conn = self._Client(token.address, authkey=self._authkey)
848 dispatch(conn, None, 'decref', (token.id,))
849 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000850 raise convert_to_error(kind, result)
851
852 def _getvalue(self):
853 '''
854 Get a copy of the value of the referent
855 '''
856 return self._callmethod('#GETVALUE')
857
858 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500859 if self._owned_by_manager:
860 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
861 return
862
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863 conn = self._Client(self._token.address, authkey=self._authkey)
864 dispatch(conn, None, 'incref', (self._id,))
865 util.debug('INCREF %r', self._token.id)
866
867 self._idset.add(self._id)
868
869 state = self._manager and self._manager._state
870
871 self._close = util.Finalize(
872 self, BaseProxy._decref,
873 args=(self._token, self._authkey, state,
874 self._tls, self._idset, self._Client),
875 exitpriority=10
876 )
877
878 @staticmethod
879 def _decref(token, authkey, state, tls, idset, _Client):
880 idset.discard(token.id)
881
882 # check whether manager is still alive
883 if state is None or state.value == State.STARTED:
884 # tell manager this process no longer cares about referent
885 try:
886 util.debug('DECREF %r', token.id)
887 conn = _Client(token.address, authkey=authkey)
888 dispatch(conn, None, 'decref', (token.id,))
889 except Exception as e:
890 util.debug('... decref failed %s', e)
891
892 else:
893 util.debug('DECREF %r -- manager already shutdown', token.id)
894
895 # check whether we can close this thread's connection because
896 # the process owns no more references to objects for this manager
897 if not idset and hasattr(tls, 'connection'):
898 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000899 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 tls.connection.close()
901 del tls.connection
902
903 def _after_fork(self):
904 self._manager = None
905 try:
906 self._incref()
907 except Exception as e:
908 # the proxy may just be for a manager which has shutdown
909 util.info('incref failed: %s' % e)
910
911 def __reduce__(self):
912 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500913 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000914 kwds['authkey'] = self._authkey
915
916 if getattr(self, '_isauto', False):
917 kwds['exposed'] = self._exposed_
918 return (RebuildProxy,
919 (AutoProxy, self._token, self._serializer, kwds))
920 else:
921 return (RebuildProxy,
922 (type(self), self._token, self._serializer, kwds))
923
924 def __deepcopy__(self, memo):
925 return self._getvalue()
926
927 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300928 return '<%s object, typeid %r at %#x>' % \
929 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000930
931 def __str__(self):
932 '''
933 Return representation of the referent (or a fall-back if that fails)
934 '''
935 try:
936 return self._callmethod('__repr__')
937 except Exception:
938 return repr(self)[:-1] + "; '__str__()' failed>"
939
940#
941# Function used for unpickling
942#
943
944def RebuildProxy(func, token, serializer, kwds):
945 '''
946 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000947 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100948 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000949 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500950 util.debug('Rebuild a proxy owned by manager, token=%r', token)
951 kwds['manager_owned'] = True
952 if token.id not in server.id_to_local_proxy_obj:
953 server.id_to_local_proxy_obj[token.id] = \
954 server.id_to_obj[token.id]
955 incref = (
956 kwds.pop('incref', True) and
957 not getattr(process.current_process(), '_inheriting', False)
958 )
959 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000960
961#
962# Functions to create proxies and proxy types
963#
964
965def MakeProxyType(name, exposed, _cache={}):
966 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300967 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000968 '''
969 exposed = tuple(exposed)
970 try:
971 return _cache[(name, exposed)]
972 except KeyError:
973 pass
974
975 dic = {}
976
977 for meth in exposed:
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300978 exec('''def %s(self, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000979 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
980
981 ProxyType = type(name, (BaseProxy,), dic)
982 ProxyType._exposed_ = exposed
983 _cache[(name, exposed)] = ProxyType
984 return ProxyType
985
986
987def AutoProxy(token, serializer, manager=None, authkey=None,
988 exposed=None, incref=True):
989 '''
990 Return an auto-proxy for `token`
991 '''
992 _Client = listener_client[serializer][1]
993
994 if exposed is None:
995 conn = _Client(token.address, authkey=authkey)
996 try:
997 exposed = dispatch(conn, None, 'get_methods', (token,))
998 finally:
999 conn.close()
1000
1001 if authkey is None and manager is not None:
1002 authkey = manager._authkey
1003 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001004 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +00001005
1006 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
1007 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
1008 incref=incref)
1009 proxy._isauto = True
1010 return proxy
1011
1012#
1013# Types/callables which we will register with SyncManager
1014#
1015
1016class Namespace(object):
Serhiy Storchaka2085bd02019-06-01 11:00:15 +03001017 def __init__(self, /, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001018 self.__dict__.update(kwds)
1019 def __repr__(self):
1020 items = list(self.__dict__.items())
1021 temp = []
1022 for name, value in items:
1023 if not name.startswith('_'):
1024 temp.append('%s=%r' % (name, value))
1025 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +03001026 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001027
1028class Value(object):
1029 def __init__(self, typecode, value, lock=True):
1030 self._typecode = typecode
1031 self._value = value
1032 def get(self):
1033 return self._value
1034 def set(self, value):
1035 self._value = value
1036 def __repr__(self):
1037 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1038 value = property(get, set)
1039
1040def Array(typecode, sequence, lock=True):
1041 return array.array(typecode, sequence)
1042
1043#
1044# Proxy types used by SyncManager
1045#
1046
1047class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +00001048 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001049 def __iter__(self):
1050 return self
1051 def __next__(self, *args):
1052 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001053 def send(self, *args):
1054 return self._callmethod('send', args)
1055 def throw(self, *args):
1056 return self._callmethod('throw', args)
1057 def close(self, *args):
1058 return self._callmethod('close', args)
1059
1060
1061class AcquirerProxy(BaseProxy):
1062 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +01001063 def acquire(self, blocking=True, timeout=None):
1064 args = (blocking,) if timeout is None else (blocking, timeout)
1065 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001066 def release(self):
1067 return self._callmethod('release')
1068 def __enter__(self):
1069 return self._callmethod('acquire')
1070 def __exit__(self, exc_type, exc_val, exc_tb):
1071 return self._callmethod('release')
1072
1073
1074class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001075 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001076 def wait(self, timeout=None):
1077 return self._callmethod('wait', (timeout,))
Antoine Pitrou48350412017-07-04 08:59:22 +02001078 def notify(self, n=1):
1079 return self._callmethod('notify', (n,))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001080 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001081 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001082 def wait_for(self, predicate, timeout=None):
1083 result = predicate()
1084 if result:
1085 return result
1086 if timeout is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001087 endtime = time.monotonic() + timeout
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001088 else:
1089 endtime = None
1090 waittime = None
1091 while not result:
1092 if endtime is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001093 waittime = endtime - time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001094 if waittime <= 0:
1095 break
1096 self.wait(waittime)
1097 result = predicate()
1098 return result
1099
Benjamin Petersone711caf2008-06-11 16:44:04 +00001100
1101class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001102 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001103 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001104 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001105 def set(self):
1106 return self._callmethod('set')
1107 def clear(self):
1108 return self._callmethod('clear')
1109 def wait(self, timeout=None):
1110 return self._callmethod('wait', (timeout,))
1111
Richard Oudkerk3730a172012-06-15 18:26:07 +01001112
1113class BarrierProxy(BaseProxy):
1114 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1115 def wait(self, timeout=None):
1116 return self._callmethod('wait', (timeout,))
1117 def abort(self):
1118 return self._callmethod('abort')
1119 def reset(self):
1120 return self._callmethod('reset')
1121 @property
1122 def parties(self):
1123 return self._callmethod('__getattribute__', ('parties',))
1124 @property
1125 def n_waiting(self):
1126 return self._callmethod('__getattribute__', ('n_waiting',))
1127 @property
1128 def broken(self):
1129 return self._callmethod('__getattribute__', ('broken',))
1130
1131
Benjamin Petersone711caf2008-06-11 16:44:04 +00001132class NamespaceProxy(BaseProxy):
1133 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1134 def __getattr__(self, key):
1135 if key[0] == '_':
1136 return object.__getattribute__(self, key)
1137 callmethod = object.__getattribute__(self, '_callmethod')
1138 return callmethod('__getattribute__', (key,))
1139 def __setattr__(self, key, value):
1140 if key[0] == '_':
1141 return object.__setattr__(self, key, value)
1142 callmethod = object.__getattribute__(self, '_callmethod')
1143 return callmethod('__setattr__', (key, value))
1144 def __delattr__(self, key):
1145 if key[0] == '_':
1146 return object.__delattr__(self, key)
1147 callmethod = object.__getattribute__(self, '_callmethod')
1148 return callmethod('__delattr__', (key,))
1149
1150
1151class ValueProxy(BaseProxy):
1152 _exposed_ = ('get', 'set')
1153 def get(self):
1154 return self._callmethod('get')
1155 def set(self, value):
1156 return self._callmethod('set', (value,))
1157 value = property(get, set)
1158
1159
1160BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001161 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1162 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001163 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1164 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001165 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001166class ListProxy(BaseListProxy):
1167 def __iadd__(self, value):
1168 self._callmethod('extend', (value,))
1169 return self
1170 def __imul__(self, value):
1171 self._callmethod('__imul__', (value,))
1172 return self
1173
1174
1175DictProxy = MakeProxyType('DictProxy', (
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001176 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
Rémi Lapeyrea31f4cc2019-02-12 01:37:24 +01001177 '__setitem__', 'clear', 'copy', 'get', 'items',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001178 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1179 ))
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001180DictProxy._method_to_typeid_ = {
1181 '__iter__': 'Iterator',
1182 }
Benjamin Petersone711caf2008-06-11 16:44:04 +00001183
1184
1185ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001186 '__len__', '__getitem__', '__setitem__'
1187 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001188
1189
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001190BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001191 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001192 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001193 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001194BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001195 'apply_async': 'AsyncResult',
1196 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001197 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001198 'imap': 'Iterator',
1199 'imap_unordered': 'Iterator'
1200 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001201class PoolProxy(BasePoolProxy):
1202 def __enter__(self):
1203 return self
1204 def __exit__(self, exc_type, exc_val, exc_tb):
1205 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001206
1207#
1208# Definition of SyncManager
1209#
1210
1211class SyncManager(BaseManager):
1212 '''
1213 Subclass of `BaseManager` which supports a number of shared object types.
1214
1215 The types registered are those intended for the synchronization
1216 of threads, plus `dict`, `list` and `Namespace`.
1217
1218 The `multiprocessing.Manager()` function creates started instances of
1219 this class.
1220 '''
1221
1222SyncManager.register('Queue', queue.Queue)
1223SyncManager.register('JoinableQueue', queue.Queue)
1224SyncManager.register('Event', threading.Event, EventProxy)
1225SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1226SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1227SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1228SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1229 AcquirerProxy)
1230SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001231SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001232SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001233SyncManager.register('list', list, ListProxy)
1234SyncManager.register('dict', dict, DictProxy)
1235SyncManager.register('Value', Value, ValueProxy)
1236SyncManager.register('Array', Array, ArrayProxy)
1237SyncManager.register('Namespace', Namespace, NamespaceProxy)
1238
1239# types returned by methods of PoolProxy
1240SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1241SyncManager.register('AsyncResult', create_method=False)
Davin Pottse895de32019-02-23 22:08:16 -06001242
1243#
1244# Definition of SharedMemoryManager and SharedMemoryServer
1245#
1246
1247if HAS_SHMEM:
1248 class _SharedMemoryTracker:
1249 "Manages one or more shared memory segments."
1250
1251 def __init__(self, name, segment_names=[]):
1252 self.shared_memory_context_name = name
1253 self.segment_names = segment_names
1254
1255 def register_segment(self, segment_name):
1256 "Adds the supplied shared memory block name to tracker."
1257 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1258 self.segment_names.append(segment_name)
1259
1260 def destroy_segment(self, segment_name):
1261 """Calls unlink() on the shared memory block with the supplied name
1262 and removes it from the list of blocks being tracked."""
1263 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1264 self.segment_names.remove(segment_name)
1265 segment = shared_memory.SharedMemory(segment_name)
1266 segment.close()
1267 segment.unlink()
1268
1269 def unlink(self):
1270 "Calls destroy_segment() on all tracked shared memory blocks."
1271 for segment_name in self.segment_names[:]:
1272 self.destroy_segment(segment_name)
1273
1274 def __del__(self):
1275 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1276 self.unlink()
1277
1278 def __getstate__(self):
1279 return (self.shared_memory_context_name, self.segment_names)
1280
1281 def __setstate__(self, state):
1282 self.__init__(*state)
1283
1284
1285 class SharedMemoryServer(Server):
1286
1287 public = Server.public + \
1288 ['track_segment', 'release_segment', 'list_segments']
1289
1290 def __init__(self, *args, **kwargs):
1291 Server.__init__(self, *args, **kwargs)
1292 self.shared_memory_context = \
1293 _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
1294 util.debug(f"SharedMemoryServer started by pid {getpid()}")
1295
Serhiy Storchaka42a139e2019-04-01 09:16:35 +03001296 def create(*args, **kwargs):
Davin Pottse895de32019-02-23 22:08:16 -06001297 """Create a new distributed-shared object (not backed by a shared
1298 memory block) and return its id to be used in a Proxy Object."""
1299 # Unless set up as a shared proxy, don't make shared_memory_context
1300 # a standard part of kwargs. This makes things easier for supplying
1301 # simple functions.
Serhiy Storchaka42a139e2019-04-01 09:16:35 +03001302 if len(args) >= 3:
1303 typeod = args[2]
1304 elif 'typeid' in kwargs:
1305 typeid = kwargs['typeid']
1306 elif not args:
1307 raise TypeError("descriptor 'create' of 'SharedMemoryServer' "
1308 "object needs an argument")
1309 else:
1310 raise TypeError('create expected at least 2 positional '
1311 'arguments, got %d' % (len(args)-1))
Davin Pottse895de32019-02-23 22:08:16 -06001312 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1313 kwargs['shared_memory_context'] = self.shared_memory_context
Serhiy Storchaka42a139e2019-04-01 09:16:35 +03001314 return Server.create(*args, **kwargs)
Serhiy Storchakad53cf992019-05-06 22:40:27 +03001315 create.__text_signature__ = '($self, c, typeid, /, *args, **kwargs)'
Davin Pottse895de32019-02-23 22:08:16 -06001316
1317 def shutdown(self, c):
1318 "Call unlink() on all tracked shared memory, terminate the Server."
1319 self.shared_memory_context.unlink()
1320 return Server.shutdown(self, c)
1321
1322 def track_segment(self, c, segment_name):
1323 "Adds the supplied shared memory block name to Server's tracker."
1324 self.shared_memory_context.register_segment(segment_name)
1325
1326 def release_segment(self, c, segment_name):
1327 """Calls unlink() on the shared memory block with the supplied name
1328 and removes it from the tracker instance inside the Server."""
1329 self.shared_memory_context.destroy_segment(segment_name)
1330
1331 def list_segments(self, c):
1332 """Returns a list of names of shared memory blocks that the Server
1333 is currently tracking."""
1334 return self.shared_memory_context.segment_names
1335
1336
1337 class SharedMemoryManager(BaseManager):
1338 """Like SyncManager but uses SharedMemoryServer instead of Server.
1339
1340 It provides methods for creating and returning SharedMemory instances
1341 and for creating a list-like object (ShareableList) backed by shared
1342 memory. It also provides methods that create and return Proxy Objects
1343 that support synchronization across processes (i.e. multi-process-safe
1344 locks and semaphores).
1345 """
1346
1347 _Server = SharedMemoryServer
1348
1349 def __init__(self, *args, **kwargs):
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02001350 if os.name == "posix":
1351 # bpo-36867: Ensure the resource_tracker is running before
1352 # launching the manager process, so that concurrent
1353 # shared_memory manipulation both in the manager and in the
1354 # current process does not create two resource_tracker
1355 # processes.
1356 from . import resource_tracker
1357 resource_tracker.ensure_running()
Davin Pottse895de32019-02-23 22:08:16 -06001358 BaseManager.__init__(self, *args, **kwargs)
1359 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1360
1361 def __del__(self):
1362 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1363 pass
1364
1365 def get_server(self):
1366 'Better than monkeypatching for now; merge into Server ultimately'
1367 if self._state.value != State.INITIAL:
1368 if self._state.value == State.STARTED:
1369 raise ProcessError("Already started SharedMemoryServer")
1370 elif self._state.value == State.SHUTDOWN:
1371 raise ProcessError("SharedMemoryManager has shut down")
1372 else:
1373 raise ProcessError(
1374 "Unknown state {!r}".format(self._state.value))
1375 return self._Server(self._registry, self._address,
1376 self._authkey, self._serializer)
1377
1378 def SharedMemory(self, size):
1379 """Returns a new SharedMemory instance with the specified size in
1380 bytes, to be tracked by the manager."""
1381 with self._Client(self._address, authkey=self._authkey) as conn:
1382 sms = shared_memory.SharedMemory(None, create=True, size=size)
1383 try:
1384 dispatch(conn, None, 'track_segment', (sms.name,))
1385 except BaseException as e:
1386 sms.unlink()
1387 raise e
1388 return sms
1389
1390 def ShareableList(self, sequence):
1391 """Returns a new ShareableList instance populated with the values
1392 from the input sequence, to be tracked by the manager."""
1393 with self._Client(self._address, authkey=self._authkey) as conn:
1394 sl = shared_memory.ShareableList(sequence)
1395 try:
1396 dispatch(conn, None, 'track_segment', (sl.shm.name,))
1397 except BaseException as e:
1398 sl.shm.unlink()
1399 raise e
1400 return sl