blob: 75b5150f821b1cd770870e1b023ea6c80be39761 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
Davin Pottse895de32019-02-23 22:08:16 -06002# Module providing manager classes for dealing
Benjamin Petersone711caf2008-06-11 16:44:04 +00003# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
Davin Pottse895de32019-02-23 22:08:16 -060011__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
12 'SharedMemoryManager' ]
Benjamin Petersone711caf2008-06-11 16:44:04 +000013
14#
15# Imports
16#
17
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000019import threading
Pierre Glaserd0d64ad2019-05-10 20:42:35 +020020import signal
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000022import queue
Victor Stinnerc2368cb2018-07-06 13:51:52 +020023import time
Pierre Glaserb1dfcad2019-05-13 21:15:32 +020024import os
Davin Pottse895de32019-02-23 22:08:16 -060025from os import getpid
Benjamin Petersone711caf2008-06-11 16:44:04 +000026
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from traceback import format_exc
28
29from . import connection
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -050030from .context import reduction, get_spawning_popen, ProcessError
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010031from . import pool
32from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010033from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010034from . import get_context
Davin Pottse895de32019-02-23 22:08:16 -060035try:
36 from . import shared_memory
37 HAS_SHMEM = True
38except ImportError:
39 HAS_SHMEM = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000040
Benjamin Petersone711caf2008-06-11 16:44:04 +000041#
Benjamin Petersone711caf2008-06-11 16:44:04 +000042# Register some things for pickling
43#
44
45def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000046 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010047reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
49view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000050if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000051 def rebuild_as_list(obj):
52 return list, (list(obj),)
53 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010054 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000055
56#
57# Type for identifying shared objects
58#
59
60class Token(object):
61 '''
62 Type to uniquely indentify a shared object
63 '''
64 __slots__ = ('typeid', 'address', 'id')
65
66 def __init__(self, typeid, address, id):
67 (self.typeid, self.address, self.id) = (typeid, address, id)
68
69 def __getstate__(self):
70 return (self.typeid, self.address, self.id)
71
72 def __setstate__(self, state):
73 (self.typeid, self.address, self.id) = state
74
75 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030076 return '%s(typeid=%r, address=%r, id=%r)' % \
77 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000078
79#
80# Function for communication with a manager's server process
81#
82
83def dispatch(c, id, methodname, args=(), kwds={}):
84 '''
85 Send a message to manager using connection `c` and return response
86 '''
87 c.send((id, methodname, args, kwds))
88 kind, result = c.recv()
89 if kind == '#RETURN':
90 return result
91 raise convert_to_error(kind, result)
92
93def convert_to_error(kind, result):
94 if kind == '#ERROR':
95 return result
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050096 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
97 if not isinstance(result, str):
98 raise TypeError(
99 "Result {0!r} (kind '{1}') type is {2}, not str".format(
100 result, kind, type(result)))
101 if kind == '#UNSERIALIZABLE':
102 return RemoteError('Unserializable message: %s\n' % result)
103 else:
104 return RemoteError(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000105 else:
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -0500106 return ValueError('Unrecognized message type {!r}'.format(kind))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000107
108class RemoteError(Exception):
109 def __str__(self):
110 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
111
112#
113# Functions for finding the method names of an object
114#
115
116def all_methods(obj):
117 '''
118 Return a list of names of methods of `obj`
119 '''
120 temp = []
121 for name in dir(obj):
122 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200123 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000124 temp.append(name)
125 return temp
126
127def public_methods(obj):
128 '''
129 Return a list of names of methods of `obj` which do not start with '_'
130 '''
131 return [name for name in all_methods(obj) if name[0] != '_']
132
133#
134# Server which is run in a process controlled by a manager
135#
136
137class Server(object):
138 '''
139 Server class which runs in a process controlled by a manager object
140 '''
141 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
142 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
143
144 def __init__(self, registry, address, authkey, serializer):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500145 if not isinstance(authkey, bytes):
146 raise TypeError(
147 "Authkey {0!r} is type {1!s}, not bytes".format(
148 authkey, type(authkey)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100150 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151 Listener, Client = listener_client[serializer]
152
153 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100154 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155 self.address = self.listener.address
156
Jesse Noller63b3a972009-01-21 02:15:48 +0000157 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500159 self.id_to_local_proxy_obj = {}
160 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161
162 def serve_forever(self):
163 '''
164 Run the server forever
165 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100166 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100167 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000168 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100169 accepter = threading.Thread(target=self.accepter)
170 accepter.daemon = True
171 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100173 while not self.stop_event.is_set():
174 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175 except (KeyboardInterrupt, SystemExit):
176 pass
177 finally:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500178 if sys.stdout != sys.__stdout__: # what about stderr?
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100179 util.debug('resetting stdout, stderr')
180 sys.stdout = sys.__stdout__
181 sys.stderr = sys.__stderr__
182 sys.exit(0)
183
184 def accepter(self):
185 while True:
186 try:
187 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200188 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100189 continue
190 t = threading.Thread(target=self.handle_request, args=(c,))
191 t.daemon = True
192 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193
194 def handle_request(self, c):
195 '''
196 Handle a new connection
197 '''
198 funcname = result = request = None
199 try:
200 connection.deliver_challenge(c, self.authkey)
201 connection.answer_challenge(c, self.authkey)
202 request = c.recv()
203 ignore, funcname, args, kwds = request
204 assert funcname in self.public, '%r unrecognized' % funcname
205 func = getattr(self, funcname)
206 except Exception:
207 msg = ('#TRACEBACK', format_exc())
208 else:
209 try:
210 result = func(c, *args, **kwds)
211 except Exception:
212 msg = ('#TRACEBACK', format_exc())
213 else:
214 msg = ('#RETURN', result)
215 try:
216 c.send(msg)
217 except Exception as e:
218 try:
219 c.send(('#TRACEBACK', format_exc()))
220 except Exception:
221 pass
222 util.info('Failure to send message: %r', msg)
223 util.info(' ... request was %r', request)
224 util.info(' ... exception was %r', e)
225
226 c.close()
227
228 def serve_client(self, conn):
229 '''
230 Handle requests from the proxies in a particular process/thread
231 '''
232 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000233 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234
235 recv = conn.recv
236 send = conn.send
237 id_to_obj = self.id_to_obj
238
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100239 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240
241 try:
242 methodname = obj = None
243 request = recv()
244 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500245 try:
246 obj, exposed, gettypeid = id_to_obj[ident]
247 except KeyError as ke:
248 try:
249 obj, exposed, gettypeid = \
250 self.id_to_local_proxy_obj[ident]
251 except KeyError as second_ke:
252 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000253
254 if methodname not in exposed:
255 raise AttributeError(
256 'method %r of %r object is not in exposed=%r' %
257 (methodname, type(obj), exposed)
258 )
259
260 function = getattr(obj, methodname)
261
262 try:
263 res = function(*args, **kwds)
264 except Exception as e:
265 msg = ('#ERROR', e)
266 else:
267 typeid = gettypeid and gettypeid.get(methodname, None)
268 if typeid:
269 rident, rexposed = self.create(conn, typeid, res)
270 token = Token(typeid, self.address, rident)
271 msg = ('#PROXY', (rexposed, token))
272 else:
273 msg = ('#RETURN', res)
274
275 except AttributeError:
276 if methodname is None:
277 msg = ('#TRACEBACK', format_exc())
278 else:
279 try:
280 fallback_func = self.fallback_mapping[methodname]
281 result = fallback_func(
282 self, conn, ident, obj, *args, **kwds
283 )
284 msg = ('#RETURN', result)
285 except Exception:
286 msg = ('#TRACEBACK', format_exc())
287
288 except EOFError:
289 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000290 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291 sys.exit(0)
292
293 except Exception:
294 msg = ('#TRACEBACK', format_exc())
295
296 try:
297 try:
298 send(msg)
299 except Exception as e:
Davin Potts37156a72016-09-08 14:40:36 -0500300 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 except Exception as e:
302 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000303 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304 util.info(' ... message was %r', msg)
305 util.info(' ... exception was %r', e)
306 conn.close()
307 sys.exit(1)
308
309 def fallback_getvalue(self, conn, ident, obj):
310 return obj
311
312 def fallback_str(self, conn, ident, obj):
313 return str(obj)
314
315 def fallback_repr(self, conn, ident, obj):
316 return repr(obj)
317
318 fallback_mapping = {
319 '__str__':fallback_str,
320 '__repr__':fallback_repr,
321 '#GETVALUE':fallback_getvalue
322 }
323
324 def dummy(self, c):
325 pass
326
327 def debug_info(self, c):
328 '''
329 Return some info --- useful to spot problems with refcounting
330 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500331 # Perhaps include debug info about 'c'?
Charles-François Natalia924fc72014-05-25 14:12:12 +0100332 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500334 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000335 keys.sort()
336 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000337 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000338 result.append(' %s: refcount=%s\n %s' %
339 (ident, self.id_to_refcount[ident],
340 str(self.id_to_obj[ident][0])[:75]))
341 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000342
343 def number_of_objects(self, c):
344 '''
345 Number of shared objects
346 '''
Davin Potts86a76682016-09-07 18:48:01 -0500347 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
348 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000349
350 def shutdown(self, c):
351 '''
352 Shutdown this process
353 '''
354 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100355 util.debug('manager received shutdown message')
356 c.send(('#RETURN', None))
357 except:
358 import traceback
359 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000360 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100361 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000362
Serhiy Storchaka142566c2019-06-05 18:22:31 +0300363 def create(self, c, typeid, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000364 '''
365 Create a new shared object and return its id
366 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100367 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000368 callable, exposed, method_to_typeid, proxytype = \
369 self.registry[typeid]
370
371 if callable is None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500372 if kwds or (len(args) != 1):
373 raise ValueError(
374 "Without callable, must have one non-keyword argument")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000375 obj = args[0]
376 else:
377 obj = callable(*args, **kwds)
378
379 if exposed is None:
380 exposed = public_methods(obj)
381 if method_to_typeid is not None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500382 if not isinstance(method_to_typeid, dict):
383 raise TypeError(
384 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
385 method_to_typeid, type(method_to_typeid)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000386 exposed = list(exposed) + list(method_to_typeid)
387
388 ident = '%x' % id(obj) # convert to string because xmlrpclib
389 # only has 32 bit signed integers
390 util.debug('%r callable returned object with id %r', typeid, ident)
391
392 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
393 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000394 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500395
396 self.incref(c, ident)
397 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000398
399 def get_methods(self, c, token):
400 '''
401 Return the methods of the shared object indicated by token
402 '''
403 return tuple(self.id_to_obj[token.id][1])
404
405 def accept_connection(self, c, name):
406 '''
407 Spawn a new thread to serve this connection
408 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000409 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000410 c.send(('#RETURN', None))
411 self.serve_client(c)
412
413 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100414 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500415 try:
416 self.id_to_refcount[ident] += 1
417 except KeyError as ke:
418 # If no external references exist but an internal (to the
419 # manager) still does and a new external reference is created
420 # from it, restore the manager's tracking of it from the
421 # previously stashed internal ref.
422 if ident in self.id_to_local_proxy_obj:
423 self.id_to_refcount[ident] = 1
424 self.id_to_obj[ident] = \
425 self.id_to_local_proxy_obj[ident]
426 obj, exposed, gettypeid = self.id_to_obj[ident]
427 util.debug('Server re-enabled tracking & INCREF %r', ident)
428 else:
429 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000430
431 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500432 if ident not in self.id_to_refcount and \
433 ident in self.id_to_local_proxy_obj:
434 util.debug('Server DECREF skipping %r', ident)
435 return
436
Charles-François Natalia924fc72014-05-25 14:12:12 +0100437 with self.mutex:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500438 if self.id_to_refcount[ident] <= 0:
439 raise AssertionError(
440 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
441 ident, self.id_to_obj[ident],
442 self.id_to_refcount[ident]))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000443 self.id_to_refcount[ident] -= 1
444 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500445 del self.id_to_refcount[ident]
446
447 if ident not in self.id_to_refcount:
448 # Two-step process in case the object turns out to contain other
449 # proxy objects (e.g. a managed list of managed lists).
450 # Otherwise, deleting self.id_to_obj[ident] would trigger the
451 # deleting of the stored value (another managed object) which would
452 # in turn attempt to acquire the mutex that is already held here.
453 self.id_to_obj[ident] = (None, (), None) # thread-safe
454 util.debug('disposing of obj with id %r', ident)
455 with self.mutex:
456 del self.id_to_obj[ident]
457
Benjamin Petersone711caf2008-06-11 16:44:04 +0000458
459#
460# Class to represent state of a manager
461#
462
463class State(object):
464 __slots__ = ['value']
465 INITIAL = 0
466 STARTED = 1
467 SHUTDOWN = 2
468
469#
470# Mapping from serializer name to Listener and Client types
471#
472
473listener_client = {
474 'pickle' : (connection.Listener, connection.Client),
475 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
476 }
477
478#
479# Definition of BaseManager
480#
481
482class BaseManager(object):
483 '''
484 Base class for managers
485 '''
486 _registry = {}
487 _Server = Server
488
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100489 def __init__(self, address=None, authkey=None, serializer='pickle',
490 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000491 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100492 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000493 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100494 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000495 self._state = State()
496 self._state.value = State.INITIAL
497 self._serializer = serializer
498 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100499 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000500
Benjamin Petersone711caf2008-06-11 16:44:04 +0000501 def get_server(self):
502 '''
503 Return server object with serve_forever() method and address attribute
504 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500505 if self._state.value != State.INITIAL:
506 if self._state.value == State.STARTED:
507 raise ProcessError("Already started server")
508 elif self._state.value == State.SHUTDOWN:
509 raise ProcessError("Manager has shut down")
510 else:
511 raise ProcessError(
512 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 return Server(self._registry, self._address,
514 self._authkey, self._serializer)
515
516 def connect(self):
517 '''
518 Connect manager object to the server process
519 '''
520 Listener, Client = listener_client[self._serializer]
521 conn = Client(self._address, authkey=self._authkey)
522 dispatch(conn, None, 'dummy')
523 self._state.value = State.STARTED
524
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000525 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000526 '''
527 Spawn a server process for this manager object
528 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500529 if self._state.value != State.INITIAL:
530 if self._state.value == State.STARTED:
531 raise ProcessError("Already started server")
532 elif self._state.value == State.SHUTDOWN:
533 raise ProcessError("Manager has shut down")
534 else:
535 raise ProcessError(
536 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000537
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200538 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000539 raise TypeError('initializer must be a callable')
540
Benjamin Petersone711caf2008-06-11 16:44:04 +0000541 # pipe over which we will retrieve address of server
542 reader, writer = connection.Pipe(duplex=False)
543
544 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100545 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000546 target=type(self)._run_server,
547 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000548 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000549 )
550 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000551 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000552 self._process.start()
553
554 # get address of server
555 writer.close()
556 self._address = reader.recv()
557 reader.close()
558
559 # register a finalizer
560 self._state.value = State.STARTED
561 self.shutdown = util.Finalize(
562 self, type(self)._finalize_manager,
563 args=(self._process, self._address, self._authkey,
564 self._state, self._Client),
565 exitpriority=0
566 )
567
568 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000569 def _run_server(cls, registry, address, authkey, serializer, writer,
570 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000571 '''
572 Create a server, report its address and run it
573 '''
Pierre Glaserd0d64ad2019-05-10 20:42:35 +0200574 # bpo-36368: protect server process from KeyboardInterrupt signals
575 signal.signal(signal.SIGINT, signal.SIG_IGN)
576
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000577 if initializer is not None:
578 initializer(*initargs)
579
Benjamin Petersone711caf2008-06-11 16:44:04 +0000580 # create server
581 server = cls._Server(registry, address, authkey, serializer)
582
583 # inform parent process of the server's address
584 writer.send(server.address)
585 writer.close()
586
587 # run the manager
588 util.info('manager serving at %r', server.address)
589 server.serve_forever()
590
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300591 def _create(self, typeid, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 '''
593 Create a new shared object; return the token and exposed tuple
594 '''
595 assert self._state.value == State.STARTED, 'server not yet started'
596 conn = self._Client(self._address, authkey=self._authkey)
597 try:
598 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
599 finally:
600 conn.close()
601 return Token(typeid, self._address, id), exposed
602
603 def join(self, timeout=None):
604 '''
605 Join the manager process (if it has been spawned)
606 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100607 if self._process is not None:
608 self._process.join(timeout)
609 if not self._process.is_alive():
610 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000611
612 def _debug_info(self):
613 '''
614 Return some info about the servers shared objects and connections
615 '''
616 conn = self._Client(self._address, authkey=self._authkey)
617 try:
618 return dispatch(conn, None, 'debug_info')
619 finally:
620 conn.close()
621
622 def _number_of_objects(self):
623 '''
624 Return the number of shared objects
625 '''
626 conn = self._Client(self._address, authkey=self._authkey)
627 try:
628 return dispatch(conn, None, 'number_of_objects')
629 finally:
630 conn.close()
631
632 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100633 if self._state.value == State.INITIAL:
634 self.start()
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500635 if self._state.value != State.STARTED:
636 if self._state.value == State.INITIAL:
637 raise ProcessError("Unable to start server")
638 elif self._state.value == State.SHUTDOWN:
639 raise ProcessError("Manager has shut down")
640 else:
641 raise ProcessError(
642 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000643 return self
644
645 def __exit__(self, exc_type, exc_val, exc_tb):
646 self.shutdown()
647
648 @staticmethod
649 def _finalize_manager(process, address, authkey, state, _Client):
650 '''
651 Shutdown the manager process; will be registered as a finalizer
652 '''
653 if process.is_alive():
654 util.info('sending shutdown message to manager')
655 try:
656 conn = _Client(address, authkey=authkey)
657 try:
658 dispatch(conn, None, 'shutdown')
659 finally:
660 conn.close()
661 except Exception:
662 pass
663
Richard Oudkerk3049f122012-06-15 20:08:29 +0100664 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000665 if process.is_alive():
666 util.info('manager still alive')
667 if hasattr(process, 'terminate'):
668 util.info('trying to `terminate()` manager process')
669 process.terminate()
670 process.join(timeout=0.1)
671 if process.is_alive():
672 util.info('manager still alive after terminate')
673
674 state.value = State.SHUTDOWN
675 try:
676 del BaseProxy._address_to_local[address]
677 except KeyError:
678 pass
679
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200680 @property
681 def address(self):
682 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683
684 @classmethod
685 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
686 method_to_typeid=None, create_method=True):
687 '''
688 Register a typeid with the manager type
689 '''
690 if '_registry' not in cls.__dict__:
691 cls._registry = cls._registry.copy()
692
693 if proxytype is None:
694 proxytype = AutoProxy
695
696 exposed = exposed or getattr(proxytype, '_exposed_', None)
697
698 method_to_typeid = method_to_typeid or \
699 getattr(proxytype, '_method_to_typeid_', None)
700
701 if method_to_typeid:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500702 for key, value in list(method_to_typeid.items()): # isinstance?
Benjamin Petersone711caf2008-06-11 16:44:04 +0000703 assert type(key) is str, '%r is not a string' % key
704 assert type(value) is str, '%r is not a string' % value
705
706 cls._registry[typeid] = (
707 callable, exposed, method_to_typeid, proxytype
708 )
709
710 if create_method:
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300711 def temp(self, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000712 util.debug('requesting creation of a shared %r object', typeid)
713 token, exp = self._create(typeid, *args, **kwds)
714 proxy = proxytype(
715 token, self._serializer, manager=self,
716 authkey=self._authkey, exposed=exp
717 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000718 conn = self._Client(token.address, authkey=self._authkey)
719 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000720 return proxy
721 temp.__name__ = typeid
722 setattr(cls, typeid, temp)
723
724#
725# Subclass of set which get cleared after a fork
726#
727
728class ProcessLocalSet(set):
729 def __init__(self):
730 util.register_after_fork(self, lambda obj: obj.clear())
731 def __reduce__(self):
732 return type(self), ()
733
734#
735# Definition of BaseProxy
736#
737
738class BaseProxy(object):
739 '''
740 A base for proxies of shared objects
741 '''
742 _address_to_local = {}
743 _mutex = util.ForkAwareThreadLock()
744
745 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500746 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100747 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000748 tls_idset = BaseProxy._address_to_local.get(token.address, None)
749 if tls_idset is None:
750 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
751 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000752
753 # self._tls is used to record the connection used by this
754 # thread to communicate with the manager at token.address
755 self._tls = tls_idset[0]
756
757 # self._idset is used to record the identities of all shared
758 # objects for which the current process owns references and
759 # which are in the manager at token.address
760 self._idset = tls_idset[1]
761
762 self._token = token
763 self._id = self._token.id
764 self._manager = manager
765 self._serializer = serializer
766 self._Client = listener_client[serializer][1]
767
Davin Potts86a76682016-09-07 18:48:01 -0500768 # Should be set to True only when a proxy object is being created
769 # on the manager server; primary use case: nested proxy objects.
770 # RebuildProxy detects when a proxy is being created on the manager
771 # and sets this value appropriately.
772 self._owned_by_manager = manager_owned
773
Benjamin Petersone711caf2008-06-11 16:44:04 +0000774 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100775 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 elif self._manager is not None:
777 self._authkey = self._manager._authkey
778 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100779 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780
781 if incref:
782 self._incref()
783
784 util.register_after_fork(self, BaseProxy._after_fork)
785
786 def _connect(self):
787 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100788 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000789 if threading.current_thread().name != 'MainThread':
790 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000791 conn = self._Client(self._token.address, authkey=self._authkey)
792 dispatch(conn, None, 'accept_connection', (name,))
793 self._tls.connection = conn
794
795 def _callmethod(self, methodname, args=(), kwds={}):
796 '''
797 Try to call a method of the referrent and return a copy of the result
798 '''
799 try:
800 conn = self._tls.connection
801 except AttributeError:
802 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000803 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804 self._connect()
805 conn = self._tls.connection
806
807 conn.send((self._id, methodname, args, kwds))
808 kind, result = conn.recv()
809
810 if kind == '#RETURN':
811 return result
812 elif kind == '#PROXY':
813 exposed, token = result
814 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100815 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000816 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000817 token, self._serializer, manager=self._manager,
818 authkey=self._authkey, exposed=exposed
819 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000820 conn = self._Client(token.address, authkey=self._authkey)
821 dispatch(conn, None, 'decref', (token.id,))
822 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000823 raise convert_to_error(kind, result)
824
825 def _getvalue(self):
826 '''
827 Get a copy of the value of the referent
828 '''
829 return self._callmethod('#GETVALUE')
830
831 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500832 if self._owned_by_manager:
833 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
834 return
835
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836 conn = self._Client(self._token.address, authkey=self._authkey)
837 dispatch(conn, None, 'incref', (self._id,))
838 util.debug('INCREF %r', self._token.id)
839
840 self._idset.add(self._id)
841
842 state = self._manager and self._manager._state
843
844 self._close = util.Finalize(
845 self, BaseProxy._decref,
846 args=(self._token, self._authkey, state,
847 self._tls, self._idset, self._Client),
848 exitpriority=10
849 )
850
851 @staticmethod
852 def _decref(token, authkey, state, tls, idset, _Client):
853 idset.discard(token.id)
854
855 # check whether manager is still alive
856 if state is None or state.value == State.STARTED:
857 # tell manager this process no longer cares about referent
858 try:
859 util.debug('DECREF %r', token.id)
860 conn = _Client(token.address, authkey=authkey)
861 dispatch(conn, None, 'decref', (token.id,))
862 except Exception as e:
863 util.debug('... decref failed %s', e)
864
865 else:
866 util.debug('DECREF %r -- manager already shutdown', token.id)
867
868 # check whether we can close this thread's connection because
869 # the process owns no more references to objects for this manager
870 if not idset and hasattr(tls, 'connection'):
871 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000872 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000873 tls.connection.close()
874 del tls.connection
875
876 def _after_fork(self):
877 self._manager = None
878 try:
879 self._incref()
880 except Exception as e:
881 # the proxy may just be for a manager which has shutdown
882 util.info('incref failed: %s' % e)
883
884 def __reduce__(self):
885 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500886 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000887 kwds['authkey'] = self._authkey
888
889 if getattr(self, '_isauto', False):
890 kwds['exposed'] = self._exposed_
891 return (RebuildProxy,
892 (AutoProxy, self._token, self._serializer, kwds))
893 else:
894 return (RebuildProxy,
895 (type(self), self._token, self._serializer, kwds))
896
897 def __deepcopy__(self, memo):
898 return self._getvalue()
899
900 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300901 return '<%s object, typeid %r at %#x>' % \
902 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000903
904 def __str__(self):
905 '''
906 Return representation of the referent (or a fall-back if that fails)
907 '''
908 try:
909 return self._callmethod('__repr__')
910 except Exception:
911 return repr(self)[:-1] + "; '__str__()' failed>"
912
913#
914# Function used for unpickling
915#
916
917def RebuildProxy(func, token, serializer, kwds):
918 '''
919 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100921 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000922 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500923 util.debug('Rebuild a proxy owned by manager, token=%r', token)
924 kwds['manager_owned'] = True
925 if token.id not in server.id_to_local_proxy_obj:
926 server.id_to_local_proxy_obj[token.id] = \
927 server.id_to_obj[token.id]
928 incref = (
929 kwds.pop('incref', True) and
930 not getattr(process.current_process(), '_inheriting', False)
931 )
932 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000933
934#
935# Functions to create proxies and proxy types
936#
937
938def MakeProxyType(name, exposed, _cache={}):
939 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300940 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000941 '''
942 exposed = tuple(exposed)
943 try:
944 return _cache[(name, exposed)]
945 except KeyError:
946 pass
947
948 dic = {}
949
950 for meth in exposed:
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300951 exec('''def %s(self, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000952 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
953
954 ProxyType = type(name, (BaseProxy,), dic)
955 ProxyType._exposed_ = exposed
956 _cache[(name, exposed)] = ProxyType
957 return ProxyType
958
959
960def AutoProxy(token, serializer, manager=None, authkey=None,
961 exposed=None, incref=True):
962 '''
963 Return an auto-proxy for `token`
964 '''
965 _Client = listener_client[serializer][1]
966
967 if exposed is None:
968 conn = _Client(token.address, authkey=authkey)
969 try:
970 exposed = dispatch(conn, None, 'get_methods', (token,))
971 finally:
972 conn.close()
973
974 if authkey is None and manager is not None:
975 authkey = manager._authkey
976 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100977 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000978
979 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
980 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
981 incref=incref)
982 proxy._isauto = True
983 return proxy
984
985#
986# Types/callables which we will register with SyncManager
987#
988
989class Namespace(object):
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300990 def __init__(self, /, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000991 self.__dict__.update(kwds)
992 def __repr__(self):
993 items = list(self.__dict__.items())
994 temp = []
995 for name, value in items:
996 if not name.startswith('_'):
997 temp.append('%s=%r' % (name, value))
998 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300999 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001000
1001class Value(object):
1002 def __init__(self, typecode, value, lock=True):
1003 self._typecode = typecode
1004 self._value = value
1005 def get(self):
1006 return self._value
1007 def set(self, value):
1008 self._value = value
1009 def __repr__(self):
1010 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1011 value = property(get, set)
1012
1013def Array(typecode, sequence, lock=True):
1014 return array.array(typecode, sequence)
1015
1016#
1017# Proxy types used by SyncManager
1018#
1019
1020class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +00001021 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001022 def __iter__(self):
1023 return self
1024 def __next__(self, *args):
1025 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001026 def send(self, *args):
1027 return self._callmethod('send', args)
1028 def throw(self, *args):
1029 return self._callmethod('throw', args)
1030 def close(self, *args):
1031 return self._callmethod('close', args)
1032
1033
1034class AcquirerProxy(BaseProxy):
1035 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +01001036 def acquire(self, blocking=True, timeout=None):
1037 args = (blocking,) if timeout is None else (blocking, timeout)
1038 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039 def release(self):
1040 return self._callmethod('release')
1041 def __enter__(self):
1042 return self._callmethod('acquire')
1043 def __exit__(self, exc_type, exc_val, exc_tb):
1044 return self._callmethod('release')
1045
1046
1047class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001048 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001049 def wait(self, timeout=None):
1050 return self._callmethod('wait', (timeout,))
Antoine Pitrou48350412017-07-04 08:59:22 +02001051 def notify(self, n=1):
1052 return self._callmethod('notify', (n,))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001053 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001054 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001055 def wait_for(self, predicate, timeout=None):
1056 result = predicate()
1057 if result:
1058 return result
1059 if timeout is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001060 endtime = time.monotonic() + timeout
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001061 else:
1062 endtime = None
1063 waittime = None
1064 while not result:
1065 if endtime is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001066 waittime = endtime - time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001067 if waittime <= 0:
1068 break
1069 self.wait(waittime)
1070 result = predicate()
1071 return result
1072
Benjamin Petersone711caf2008-06-11 16:44:04 +00001073
1074class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001075 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001076 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001077 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001078 def set(self):
1079 return self._callmethod('set')
1080 def clear(self):
1081 return self._callmethod('clear')
1082 def wait(self, timeout=None):
1083 return self._callmethod('wait', (timeout,))
1084
Richard Oudkerk3730a172012-06-15 18:26:07 +01001085
1086class BarrierProxy(BaseProxy):
1087 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1088 def wait(self, timeout=None):
1089 return self._callmethod('wait', (timeout,))
1090 def abort(self):
1091 return self._callmethod('abort')
1092 def reset(self):
1093 return self._callmethod('reset')
1094 @property
1095 def parties(self):
1096 return self._callmethod('__getattribute__', ('parties',))
1097 @property
1098 def n_waiting(self):
1099 return self._callmethod('__getattribute__', ('n_waiting',))
1100 @property
1101 def broken(self):
1102 return self._callmethod('__getattribute__', ('broken',))
1103
1104
Benjamin Petersone711caf2008-06-11 16:44:04 +00001105class NamespaceProxy(BaseProxy):
1106 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1107 def __getattr__(self, key):
1108 if key[0] == '_':
1109 return object.__getattribute__(self, key)
1110 callmethod = object.__getattribute__(self, '_callmethod')
1111 return callmethod('__getattribute__', (key,))
1112 def __setattr__(self, key, value):
1113 if key[0] == '_':
1114 return object.__setattr__(self, key, value)
1115 callmethod = object.__getattribute__(self, '_callmethod')
1116 return callmethod('__setattr__', (key, value))
1117 def __delattr__(self, key):
1118 if key[0] == '_':
1119 return object.__delattr__(self, key)
1120 callmethod = object.__getattribute__(self, '_callmethod')
1121 return callmethod('__delattr__', (key,))
1122
1123
1124class ValueProxy(BaseProxy):
1125 _exposed_ = ('get', 'set')
1126 def get(self):
1127 return self._callmethod('get')
1128 def set(self, value):
1129 return self._callmethod('set', (value,))
1130 value = property(get, set)
1131
1132
1133BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001134 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1135 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001136 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1137 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001138 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001139class ListProxy(BaseListProxy):
1140 def __iadd__(self, value):
1141 self._callmethod('extend', (value,))
1142 return self
1143 def __imul__(self, value):
1144 self._callmethod('__imul__', (value,))
1145 return self
1146
1147
1148DictProxy = MakeProxyType('DictProxy', (
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001149 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
Rémi Lapeyrea31f4cc2019-02-12 01:37:24 +01001150 '__setitem__', 'clear', 'copy', 'get', 'items',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001151 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1152 ))
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001153DictProxy._method_to_typeid_ = {
1154 '__iter__': 'Iterator',
1155 }
Benjamin Petersone711caf2008-06-11 16:44:04 +00001156
1157
1158ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001159 '__len__', '__getitem__', '__setitem__'
1160 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001161
1162
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001163BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001164 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001165 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001166 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001167BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001168 'apply_async': 'AsyncResult',
1169 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001170 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001171 'imap': 'Iterator',
1172 'imap_unordered': 'Iterator'
1173 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001174class PoolProxy(BasePoolProxy):
1175 def __enter__(self):
1176 return self
1177 def __exit__(self, exc_type, exc_val, exc_tb):
1178 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001179
1180#
1181# Definition of SyncManager
1182#
1183
1184class SyncManager(BaseManager):
1185 '''
1186 Subclass of `BaseManager` which supports a number of shared object types.
1187
1188 The types registered are those intended for the synchronization
1189 of threads, plus `dict`, `list` and `Namespace`.
1190
1191 The `multiprocessing.Manager()` function creates started instances of
1192 this class.
1193 '''
1194
1195SyncManager.register('Queue', queue.Queue)
1196SyncManager.register('JoinableQueue', queue.Queue)
1197SyncManager.register('Event', threading.Event, EventProxy)
1198SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1199SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1200SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1201SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1202 AcquirerProxy)
1203SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001204SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001205SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001206SyncManager.register('list', list, ListProxy)
1207SyncManager.register('dict', dict, DictProxy)
1208SyncManager.register('Value', Value, ValueProxy)
1209SyncManager.register('Array', Array, ArrayProxy)
1210SyncManager.register('Namespace', Namespace, NamespaceProxy)
1211
1212# types returned by methods of PoolProxy
1213SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1214SyncManager.register('AsyncResult', create_method=False)
Davin Pottse895de32019-02-23 22:08:16 -06001215
1216#
1217# Definition of SharedMemoryManager and SharedMemoryServer
1218#
1219
1220if HAS_SHMEM:
1221 class _SharedMemoryTracker:
1222 "Manages one or more shared memory segments."
1223
1224 def __init__(self, name, segment_names=[]):
1225 self.shared_memory_context_name = name
1226 self.segment_names = segment_names
1227
1228 def register_segment(self, segment_name):
1229 "Adds the supplied shared memory block name to tracker."
1230 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1231 self.segment_names.append(segment_name)
1232
1233 def destroy_segment(self, segment_name):
1234 """Calls unlink() on the shared memory block with the supplied name
1235 and removes it from the list of blocks being tracked."""
1236 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1237 self.segment_names.remove(segment_name)
1238 segment = shared_memory.SharedMemory(segment_name)
1239 segment.close()
1240 segment.unlink()
1241
1242 def unlink(self):
1243 "Calls destroy_segment() on all tracked shared memory blocks."
1244 for segment_name in self.segment_names[:]:
1245 self.destroy_segment(segment_name)
1246
1247 def __del__(self):
1248 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1249 self.unlink()
1250
1251 def __getstate__(self):
1252 return (self.shared_memory_context_name, self.segment_names)
1253
1254 def __setstate__(self, state):
1255 self.__init__(*state)
1256
1257
1258 class SharedMemoryServer(Server):
1259
1260 public = Server.public + \
1261 ['track_segment', 'release_segment', 'list_segments']
1262
1263 def __init__(self, *args, **kwargs):
1264 Server.__init__(self, *args, **kwargs)
1265 self.shared_memory_context = \
1266 _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
1267 util.debug(f"SharedMemoryServer started by pid {getpid()}")
1268
Serhiy Storchaka142566c2019-06-05 18:22:31 +03001269 def create(self, c, typeid, /, *args, **kwargs):
Davin Pottse895de32019-02-23 22:08:16 -06001270 """Create a new distributed-shared object (not backed by a shared
1271 memory block) and return its id to be used in a Proxy Object."""
1272 # Unless set up as a shared proxy, don't make shared_memory_context
1273 # a standard part of kwargs. This makes things easier for supplying
1274 # simple functions.
1275 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1276 kwargs['shared_memory_context'] = self.shared_memory_context
Serhiy Storchaka142566c2019-06-05 18:22:31 +03001277 return Server.create(self, c, typeid, *args, **kwargs)
Davin Pottse895de32019-02-23 22:08:16 -06001278
1279 def shutdown(self, c):
1280 "Call unlink() on all tracked shared memory, terminate the Server."
1281 self.shared_memory_context.unlink()
1282 return Server.shutdown(self, c)
1283
1284 def track_segment(self, c, segment_name):
1285 "Adds the supplied shared memory block name to Server's tracker."
1286 self.shared_memory_context.register_segment(segment_name)
1287
1288 def release_segment(self, c, segment_name):
1289 """Calls unlink() on the shared memory block with the supplied name
1290 and removes it from the tracker instance inside the Server."""
1291 self.shared_memory_context.destroy_segment(segment_name)
1292
1293 def list_segments(self, c):
1294 """Returns a list of names of shared memory blocks that the Server
1295 is currently tracking."""
1296 return self.shared_memory_context.segment_names
1297
1298
1299 class SharedMemoryManager(BaseManager):
1300 """Like SyncManager but uses SharedMemoryServer instead of Server.
1301
1302 It provides methods for creating and returning SharedMemory instances
1303 and for creating a list-like object (ShareableList) backed by shared
1304 memory. It also provides methods that create and return Proxy Objects
1305 that support synchronization across processes (i.e. multi-process-safe
1306 locks and semaphores).
1307 """
1308
1309 _Server = SharedMemoryServer
1310
1311 def __init__(self, *args, **kwargs):
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02001312 if os.name == "posix":
1313 # bpo-36867: Ensure the resource_tracker is running before
1314 # launching the manager process, so that concurrent
1315 # shared_memory manipulation both in the manager and in the
1316 # current process does not create two resource_tracker
1317 # processes.
1318 from . import resource_tracker
1319 resource_tracker.ensure_running()
Davin Pottse895de32019-02-23 22:08:16 -06001320 BaseManager.__init__(self, *args, **kwargs)
1321 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1322
1323 def __del__(self):
1324 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1325 pass
1326
1327 def get_server(self):
1328 'Better than monkeypatching for now; merge into Server ultimately'
1329 if self._state.value != State.INITIAL:
1330 if self._state.value == State.STARTED:
1331 raise ProcessError("Already started SharedMemoryServer")
1332 elif self._state.value == State.SHUTDOWN:
1333 raise ProcessError("SharedMemoryManager has shut down")
1334 else:
1335 raise ProcessError(
1336 "Unknown state {!r}".format(self._state.value))
1337 return self._Server(self._registry, self._address,
1338 self._authkey, self._serializer)
1339
1340 def SharedMemory(self, size):
1341 """Returns a new SharedMemory instance with the specified size in
1342 bytes, to be tracked by the manager."""
1343 with self._Client(self._address, authkey=self._authkey) as conn:
1344 sms = shared_memory.SharedMemory(None, create=True, size=size)
1345 try:
1346 dispatch(conn, None, 'track_segment', (sms.name,))
1347 except BaseException as e:
1348 sms.unlink()
1349 raise e
1350 return sms
1351
1352 def ShareableList(self, sequence):
1353 """Returns a new ShareableList instance populated with the values
1354 from the input sequence, to be tracked by the manager."""
1355 with self._Client(self._address, authkey=self._authkey) as conn:
1356 sl = shared_memory.ShareableList(sequence)
1357 try:
1358 dispatch(conn, None, 'track_segment', (sl.shm.name,))
1359 except BaseException as e:
1360 sl.shm.unlink()
1361 raise e
1362 return sl