blob: 0eb16c664cfab3608f13ac372be2a31491db452d [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
Davin Pottse895de32019-02-23 22:08:16 -06002# Module providing manager classes for dealing
Benjamin Petersone711caf2008-06-11 16:44:04 +00003# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
Davin Pottse895de32019-02-23 22:08:16 -060011__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
12 'SharedMemoryManager' ]
Benjamin Petersone711caf2008-06-11 16:44:04 +000013
14#
15# Imports
16#
17
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000019import threading
Pierre Glaserd0d64ad2019-05-10 20:42:35 +020020import signal
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000022import queue
Victor Stinnerc2368cb2018-07-06 13:51:52 +020023import time
Batuhan Taşkaya03615562020-04-10 17:46:36 +030024import types
Pierre Glaserb1dfcad2019-05-13 21:15:32 +020025import os
Davin Pottse895de32019-02-23 22:08:16 -060026from os import getpid
Benjamin Petersone711caf2008-06-11 16:44:04 +000027
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010028from traceback import format_exc
29
30from . import connection
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -050031from .context import reduction, get_spawning_popen, ProcessError
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010032from . import pool
33from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010034from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010035from . import get_context
Davin Pottse895de32019-02-23 22:08:16 -060036try:
37 from . import shared_memory
38 HAS_SHMEM = True
39except ImportError:
40 HAS_SHMEM = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
Benjamin Petersone711caf2008-06-11 16:44:04 +000042#
Benjamin Petersone711caf2008-06-11 16:44:04 +000043# Register some things for pickling
44#
45
46def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000047 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010048reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
50view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000051if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000052 def rebuild_as_list(obj):
53 return list, (list(obj),)
54 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010055 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000056
57#
58# Type for identifying shared objects
59#
60
61class Token(object):
62 '''
Galdenc6066242020-04-18 14:58:29 +080063 Type to uniquely identify a shared object
Benjamin Petersone711caf2008-06-11 16:44:04 +000064 '''
65 __slots__ = ('typeid', 'address', 'id')
66
67 def __init__(self, typeid, address, id):
68 (self.typeid, self.address, self.id) = (typeid, address, id)
69
70 def __getstate__(self):
71 return (self.typeid, self.address, self.id)
72
73 def __setstate__(self, state):
74 (self.typeid, self.address, self.id) = state
75
76 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030077 return '%s(typeid=%r, address=%r, id=%r)' % \
78 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000079
80#
81# Function for communication with a manager's server process
82#
83
84def dispatch(c, id, methodname, args=(), kwds={}):
85 '''
86 Send a message to manager using connection `c` and return response
87 '''
88 c.send((id, methodname, args, kwds))
89 kind, result = c.recv()
90 if kind == '#RETURN':
91 return result
92 raise convert_to_error(kind, result)
93
94def convert_to_error(kind, result):
95 if kind == '#ERROR':
96 return result
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050097 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
98 if not isinstance(result, str):
99 raise TypeError(
100 "Result {0!r} (kind '{1}') type is {2}, not str".format(
101 result, kind, type(result)))
102 if kind == '#UNSERIALIZABLE':
103 return RemoteError('Unserializable message: %s\n' % result)
104 else:
105 return RemoteError(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000106 else:
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -0500107 return ValueError('Unrecognized message type {!r}'.format(kind))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000108
109class RemoteError(Exception):
110 def __str__(self):
111 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
112
113#
114# Functions for finding the method names of an object
115#
116
117def all_methods(obj):
118 '''
119 Return a list of names of methods of `obj`
120 '''
121 temp = []
122 for name in dir(obj):
123 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200124 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000125 temp.append(name)
126 return temp
127
128def public_methods(obj):
129 '''
130 Return a list of names of methods of `obj` which do not start with '_'
131 '''
132 return [name for name in all_methods(obj) if name[0] != '_']
133
134#
135# Server which is run in a process controlled by a manager
136#
137
138class Server(object):
139 '''
140 Server class which runs in a process controlled by a manager object
141 '''
142 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
143 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
144
145 def __init__(self, registry, address, authkey, serializer):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500146 if not isinstance(authkey, bytes):
147 raise TypeError(
148 "Authkey {0!r} is type {1!s}, not bytes".format(
149 authkey, type(authkey)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000150 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100151 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152 Listener, Client = listener_client[serializer]
153
154 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100155 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156 self.address = self.listener.address
157
Jesse Noller63b3a972009-01-21 02:15:48 +0000158 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500160 self.id_to_local_proxy_obj = {}
161 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162
163 def serve_forever(self):
164 '''
165 Run the server forever
166 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100167 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100168 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100170 accepter = threading.Thread(target=self.accepter)
171 accepter.daemon = True
172 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100174 while not self.stop_event.is_set():
175 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000176 except (KeyboardInterrupt, SystemExit):
177 pass
178 finally:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500179 if sys.stdout != sys.__stdout__: # what about stderr?
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100180 util.debug('resetting stdout, stderr')
181 sys.stdout = sys.__stdout__
182 sys.stderr = sys.__stderr__
183 sys.exit(0)
184
185 def accepter(self):
186 while True:
187 try:
188 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200189 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100190 continue
191 t = threading.Thread(target=self.handle_request, args=(c,))
192 t.daemon = True
193 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000194
195 def handle_request(self, c):
196 '''
197 Handle a new connection
198 '''
199 funcname = result = request = None
200 try:
201 connection.deliver_challenge(c, self.authkey)
202 connection.answer_challenge(c, self.authkey)
203 request = c.recv()
204 ignore, funcname, args, kwds = request
205 assert funcname in self.public, '%r unrecognized' % funcname
206 func = getattr(self, funcname)
207 except Exception:
208 msg = ('#TRACEBACK', format_exc())
209 else:
210 try:
211 result = func(c, *args, **kwds)
212 except Exception:
213 msg = ('#TRACEBACK', format_exc())
214 else:
215 msg = ('#RETURN', result)
216 try:
217 c.send(msg)
218 except Exception as e:
219 try:
220 c.send(('#TRACEBACK', format_exc()))
221 except Exception:
222 pass
223 util.info('Failure to send message: %r', msg)
224 util.info(' ... request was %r', request)
225 util.info(' ... exception was %r', e)
226
227 c.close()
228
229 def serve_client(self, conn):
230 '''
231 Handle requests from the proxies in a particular process/thread
232 '''
233 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000234 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235
236 recv = conn.recv
237 send = conn.send
238 id_to_obj = self.id_to_obj
239
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100240 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 try:
243 methodname = obj = None
244 request = recv()
245 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500246 try:
247 obj, exposed, gettypeid = id_to_obj[ident]
248 except KeyError as ke:
249 try:
250 obj, exposed, gettypeid = \
251 self.id_to_local_proxy_obj[ident]
Pablo Galindo293dd232019-11-19 21:34:03 +0000252 except KeyError:
Davin Potts86a76682016-09-07 18:48:01 -0500253 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000254
255 if methodname not in exposed:
256 raise AttributeError(
257 'method %r of %r object is not in exposed=%r' %
258 (methodname, type(obj), exposed)
259 )
260
261 function = getattr(obj, methodname)
262
263 try:
264 res = function(*args, **kwds)
265 except Exception as e:
266 msg = ('#ERROR', e)
267 else:
268 typeid = gettypeid and gettypeid.get(methodname, None)
269 if typeid:
270 rident, rexposed = self.create(conn, typeid, res)
271 token = Token(typeid, self.address, rident)
272 msg = ('#PROXY', (rexposed, token))
273 else:
274 msg = ('#RETURN', res)
275
276 except AttributeError:
277 if methodname is None:
278 msg = ('#TRACEBACK', format_exc())
279 else:
280 try:
281 fallback_func = self.fallback_mapping[methodname]
282 result = fallback_func(
283 self, conn, ident, obj, *args, **kwds
284 )
285 msg = ('#RETURN', result)
286 except Exception:
287 msg = ('#TRACEBACK', format_exc())
288
289 except EOFError:
290 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000291 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000292 sys.exit(0)
293
294 except Exception:
295 msg = ('#TRACEBACK', format_exc())
296
297 try:
298 try:
299 send(msg)
Pablo Galindo293dd232019-11-19 21:34:03 +0000300 except Exception:
Davin Potts37156a72016-09-08 14:40:36 -0500301 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302 except Exception as e:
303 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000304 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000305 util.info(' ... message was %r', msg)
306 util.info(' ... exception was %r', e)
307 conn.close()
308 sys.exit(1)
309
310 def fallback_getvalue(self, conn, ident, obj):
311 return obj
312
313 def fallback_str(self, conn, ident, obj):
314 return str(obj)
315
316 def fallback_repr(self, conn, ident, obj):
317 return repr(obj)
318
319 fallback_mapping = {
320 '__str__':fallback_str,
321 '__repr__':fallback_repr,
322 '#GETVALUE':fallback_getvalue
323 }
324
325 def dummy(self, c):
326 pass
327
328 def debug_info(self, c):
329 '''
330 Return some info --- useful to spot problems with refcounting
331 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500332 # Perhaps include debug info about 'c'?
Charles-François Natalia924fc72014-05-25 14:12:12 +0100333 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000334 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500335 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 keys.sort()
337 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000338 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339 result.append(' %s: refcount=%s\n %s' %
340 (ident, self.id_to_refcount[ident],
341 str(self.id_to_obj[ident][0])[:75]))
342 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343
344 def number_of_objects(self, c):
345 '''
346 Number of shared objects
347 '''
Davin Potts86a76682016-09-07 18:48:01 -0500348 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
349 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000350
351 def shutdown(self, c):
352 '''
353 Shutdown this process
354 '''
355 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100356 util.debug('manager received shutdown message')
357 c.send(('#RETURN', None))
358 except:
359 import traceback
360 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000361 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100362 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000363
Serhiy Storchaka142566c2019-06-05 18:22:31 +0300364 def create(self, c, typeid, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000365 '''
366 Create a new shared object and return its id
367 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100368 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000369 callable, exposed, method_to_typeid, proxytype = \
370 self.registry[typeid]
371
372 if callable is None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500373 if kwds or (len(args) != 1):
374 raise ValueError(
375 "Without callable, must have one non-keyword argument")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000376 obj = args[0]
377 else:
378 obj = callable(*args, **kwds)
379
380 if exposed is None:
381 exposed = public_methods(obj)
382 if method_to_typeid is not None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500383 if not isinstance(method_to_typeid, dict):
384 raise TypeError(
385 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
386 method_to_typeid, type(method_to_typeid)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000387 exposed = list(exposed) + list(method_to_typeid)
388
389 ident = '%x' % id(obj) # convert to string because xmlrpclib
390 # only has 32 bit signed integers
391 util.debug('%r callable returned object with id %r', typeid, ident)
392
393 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
394 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000395 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500396
397 self.incref(c, ident)
398 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000399
400 def get_methods(self, c, token):
401 '''
402 Return the methods of the shared object indicated by token
403 '''
404 return tuple(self.id_to_obj[token.id][1])
405
406 def accept_connection(self, c, name):
407 '''
408 Spawn a new thread to serve this connection
409 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000410 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000411 c.send(('#RETURN', None))
412 self.serve_client(c)
413
414 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100415 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500416 try:
417 self.id_to_refcount[ident] += 1
418 except KeyError as ke:
419 # If no external references exist but an internal (to the
420 # manager) still does and a new external reference is created
421 # from it, restore the manager's tracking of it from the
422 # previously stashed internal ref.
423 if ident in self.id_to_local_proxy_obj:
424 self.id_to_refcount[ident] = 1
425 self.id_to_obj[ident] = \
426 self.id_to_local_proxy_obj[ident]
427 obj, exposed, gettypeid = self.id_to_obj[ident]
428 util.debug('Server re-enabled tracking & INCREF %r', ident)
429 else:
430 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000431
432 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500433 if ident not in self.id_to_refcount and \
434 ident in self.id_to_local_proxy_obj:
435 util.debug('Server DECREF skipping %r', ident)
436 return
437
Charles-François Natalia924fc72014-05-25 14:12:12 +0100438 with self.mutex:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500439 if self.id_to_refcount[ident] <= 0:
440 raise AssertionError(
441 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
442 ident, self.id_to_obj[ident],
443 self.id_to_refcount[ident]))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000444 self.id_to_refcount[ident] -= 1
445 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500446 del self.id_to_refcount[ident]
447
448 if ident not in self.id_to_refcount:
449 # Two-step process in case the object turns out to contain other
450 # proxy objects (e.g. a managed list of managed lists).
451 # Otherwise, deleting self.id_to_obj[ident] would trigger the
452 # deleting of the stored value (another managed object) which would
453 # in turn attempt to acquire the mutex that is already held here.
454 self.id_to_obj[ident] = (None, (), None) # thread-safe
455 util.debug('disposing of obj with id %r', ident)
456 with self.mutex:
457 del self.id_to_obj[ident]
458
Benjamin Petersone711caf2008-06-11 16:44:04 +0000459
460#
461# Class to represent state of a manager
462#
463
464class State(object):
465 __slots__ = ['value']
466 INITIAL = 0
467 STARTED = 1
468 SHUTDOWN = 2
469
470#
471# Mapping from serializer name to Listener and Client types
472#
473
474listener_client = {
475 'pickle' : (connection.Listener, connection.Client),
476 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
477 }
478
479#
480# Definition of BaseManager
481#
482
483class BaseManager(object):
484 '''
485 Base class for managers
486 '''
487 _registry = {}
488 _Server = Server
489
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100490 def __init__(self, address=None, authkey=None, serializer='pickle',
491 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000492 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100493 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000494 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100495 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000496 self._state = State()
497 self._state.value = State.INITIAL
498 self._serializer = serializer
499 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100500 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000501
Benjamin Petersone711caf2008-06-11 16:44:04 +0000502 def get_server(self):
503 '''
504 Return server object with serve_forever() method and address attribute
505 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500506 if self._state.value != State.INITIAL:
507 if self._state.value == State.STARTED:
508 raise ProcessError("Already started server")
509 elif self._state.value == State.SHUTDOWN:
510 raise ProcessError("Manager has shut down")
511 else:
512 raise ProcessError(
513 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000514 return Server(self._registry, self._address,
515 self._authkey, self._serializer)
516
517 def connect(self):
518 '''
519 Connect manager object to the server process
520 '''
521 Listener, Client = listener_client[self._serializer]
522 conn = Client(self._address, authkey=self._authkey)
523 dispatch(conn, None, 'dummy')
524 self._state.value = State.STARTED
525
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000526 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000527 '''
528 Spawn a server process for this manager object
529 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500530 if self._state.value != State.INITIAL:
531 if self._state.value == State.STARTED:
532 raise ProcessError("Already started server")
533 elif self._state.value == State.SHUTDOWN:
534 raise ProcessError("Manager has shut down")
535 else:
536 raise ProcessError(
537 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000538
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200539 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000540 raise TypeError('initializer must be a callable')
541
Benjamin Petersone711caf2008-06-11 16:44:04 +0000542 # pipe over which we will retrieve address of server
543 reader, writer = connection.Pipe(duplex=False)
544
545 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100546 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000547 target=type(self)._run_server,
548 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000549 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000550 )
551 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000552 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000553 self._process.start()
554
555 # get address of server
556 writer.close()
557 self._address = reader.recv()
558 reader.close()
559
560 # register a finalizer
561 self._state.value = State.STARTED
562 self.shutdown = util.Finalize(
563 self, type(self)._finalize_manager,
564 args=(self._process, self._address, self._authkey,
565 self._state, self._Client),
566 exitpriority=0
567 )
568
569 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000570 def _run_server(cls, registry, address, authkey, serializer, writer,
571 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000572 '''
573 Create a server, report its address and run it
574 '''
Pierre Glaserd0d64ad2019-05-10 20:42:35 +0200575 # bpo-36368: protect server process from KeyboardInterrupt signals
576 signal.signal(signal.SIGINT, signal.SIG_IGN)
577
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000578 if initializer is not None:
579 initializer(*initargs)
580
Benjamin Petersone711caf2008-06-11 16:44:04 +0000581 # create server
582 server = cls._Server(registry, address, authkey, serializer)
583
584 # inform parent process of the server's address
585 writer.send(server.address)
586 writer.close()
587
588 # run the manager
589 util.info('manager serving at %r', server.address)
590 server.serve_forever()
591
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300592 def _create(self, typeid, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000593 '''
594 Create a new shared object; return the token and exposed tuple
595 '''
596 assert self._state.value == State.STARTED, 'server not yet started'
597 conn = self._Client(self._address, authkey=self._authkey)
598 try:
599 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
600 finally:
601 conn.close()
602 return Token(typeid, self._address, id), exposed
603
604 def join(self, timeout=None):
605 '''
606 Join the manager process (if it has been spawned)
607 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100608 if self._process is not None:
609 self._process.join(timeout)
610 if not self._process.is_alive():
611 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000612
613 def _debug_info(self):
614 '''
615 Return some info about the servers shared objects and connections
616 '''
617 conn = self._Client(self._address, authkey=self._authkey)
618 try:
619 return dispatch(conn, None, 'debug_info')
620 finally:
621 conn.close()
622
623 def _number_of_objects(self):
624 '''
625 Return the number of shared objects
626 '''
627 conn = self._Client(self._address, authkey=self._authkey)
628 try:
629 return dispatch(conn, None, 'number_of_objects')
630 finally:
631 conn.close()
632
633 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100634 if self._state.value == State.INITIAL:
635 self.start()
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500636 if self._state.value != State.STARTED:
637 if self._state.value == State.INITIAL:
638 raise ProcessError("Unable to start server")
639 elif self._state.value == State.SHUTDOWN:
640 raise ProcessError("Manager has shut down")
641 else:
642 raise ProcessError(
643 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000644 return self
645
646 def __exit__(self, exc_type, exc_val, exc_tb):
647 self.shutdown()
648
649 @staticmethod
650 def _finalize_manager(process, address, authkey, state, _Client):
651 '''
652 Shutdown the manager process; will be registered as a finalizer
653 '''
654 if process.is_alive():
655 util.info('sending shutdown message to manager')
656 try:
657 conn = _Client(address, authkey=authkey)
658 try:
659 dispatch(conn, None, 'shutdown')
660 finally:
661 conn.close()
662 except Exception:
663 pass
664
Richard Oudkerk3049f122012-06-15 20:08:29 +0100665 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000666 if process.is_alive():
667 util.info('manager still alive')
668 if hasattr(process, 'terminate'):
669 util.info('trying to `terminate()` manager process')
670 process.terminate()
671 process.join(timeout=0.1)
672 if process.is_alive():
673 util.info('manager still alive after terminate')
674
675 state.value = State.SHUTDOWN
676 try:
677 del BaseProxy._address_to_local[address]
678 except KeyError:
679 pass
680
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200681 @property
682 def address(self):
683 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000684
685 @classmethod
686 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
687 method_to_typeid=None, create_method=True):
688 '''
689 Register a typeid with the manager type
690 '''
691 if '_registry' not in cls.__dict__:
692 cls._registry = cls._registry.copy()
693
694 if proxytype is None:
695 proxytype = AutoProxy
696
697 exposed = exposed or getattr(proxytype, '_exposed_', None)
698
699 method_to_typeid = method_to_typeid or \
700 getattr(proxytype, '_method_to_typeid_', None)
701
702 if method_to_typeid:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500703 for key, value in list(method_to_typeid.items()): # isinstance?
Benjamin Petersone711caf2008-06-11 16:44:04 +0000704 assert type(key) is str, '%r is not a string' % key
705 assert type(value) is str, '%r is not a string' % value
706
707 cls._registry[typeid] = (
708 callable, exposed, method_to_typeid, proxytype
709 )
710
711 if create_method:
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300712 def temp(self, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000713 util.debug('requesting creation of a shared %r object', typeid)
714 token, exp = self._create(typeid, *args, **kwds)
715 proxy = proxytype(
716 token, self._serializer, manager=self,
717 authkey=self._authkey, exposed=exp
718 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000719 conn = self._Client(token.address, authkey=self._authkey)
720 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000721 return proxy
722 temp.__name__ = typeid
723 setattr(cls, typeid, temp)
724
725#
726# Subclass of set which get cleared after a fork
727#
728
729class ProcessLocalSet(set):
730 def __init__(self):
731 util.register_after_fork(self, lambda obj: obj.clear())
732 def __reduce__(self):
733 return type(self), ()
734
735#
736# Definition of BaseProxy
737#
738
739class BaseProxy(object):
740 '''
741 A base for proxies of shared objects
742 '''
743 _address_to_local = {}
744 _mutex = util.ForkAwareThreadLock()
745
746 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500747 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100748 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000749 tls_idset = BaseProxy._address_to_local.get(token.address, None)
750 if tls_idset is None:
751 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
752 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000753
754 # self._tls is used to record the connection used by this
755 # thread to communicate with the manager at token.address
756 self._tls = tls_idset[0]
757
758 # self._idset is used to record the identities of all shared
759 # objects for which the current process owns references and
760 # which are in the manager at token.address
761 self._idset = tls_idset[1]
762
763 self._token = token
764 self._id = self._token.id
765 self._manager = manager
766 self._serializer = serializer
767 self._Client = listener_client[serializer][1]
768
Davin Potts86a76682016-09-07 18:48:01 -0500769 # Should be set to True only when a proxy object is being created
770 # on the manager server; primary use case: nested proxy objects.
771 # RebuildProxy detects when a proxy is being created on the manager
772 # and sets this value appropriately.
773 self._owned_by_manager = manager_owned
774
Benjamin Petersone711caf2008-06-11 16:44:04 +0000775 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100776 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000777 elif self._manager is not None:
778 self._authkey = self._manager._authkey
779 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100780 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000781
782 if incref:
783 self._incref()
784
785 util.register_after_fork(self, BaseProxy._after_fork)
786
787 def _connect(self):
788 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100789 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000790 if threading.current_thread().name != 'MainThread':
791 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000792 conn = self._Client(self._token.address, authkey=self._authkey)
793 dispatch(conn, None, 'accept_connection', (name,))
794 self._tls.connection = conn
795
796 def _callmethod(self, methodname, args=(), kwds={}):
797 '''
Galdenc6066242020-04-18 14:58:29 +0800798 Try to call a method of the referent and return a copy of the result
Benjamin Petersone711caf2008-06-11 16:44:04 +0000799 '''
800 try:
801 conn = self._tls.connection
802 except AttributeError:
803 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000804 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000805 self._connect()
806 conn = self._tls.connection
807
808 conn.send((self._id, methodname, args, kwds))
809 kind, result = conn.recv()
810
811 if kind == '#RETURN':
812 return result
813 elif kind == '#PROXY':
814 exposed, token = result
815 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100816 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000817 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000818 token, self._serializer, manager=self._manager,
819 authkey=self._authkey, exposed=exposed
820 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000821 conn = self._Client(token.address, authkey=self._authkey)
822 dispatch(conn, None, 'decref', (token.id,))
823 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000824 raise convert_to_error(kind, result)
825
826 def _getvalue(self):
827 '''
828 Get a copy of the value of the referent
829 '''
830 return self._callmethod('#GETVALUE')
831
832 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500833 if self._owned_by_manager:
834 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
835 return
836
Benjamin Petersone711caf2008-06-11 16:44:04 +0000837 conn = self._Client(self._token.address, authkey=self._authkey)
838 dispatch(conn, None, 'incref', (self._id,))
839 util.debug('INCREF %r', self._token.id)
840
841 self._idset.add(self._id)
842
843 state = self._manager and self._manager._state
844
845 self._close = util.Finalize(
846 self, BaseProxy._decref,
847 args=(self._token, self._authkey, state,
848 self._tls, self._idset, self._Client),
849 exitpriority=10
850 )
851
852 @staticmethod
853 def _decref(token, authkey, state, tls, idset, _Client):
854 idset.discard(token.id)
855
856 # check whether manager is still alive
857 if state is None or state.value == State.STARTED:
858 # tell manager this process no longer cares about referent
859 try:
860 util.debug('DECREF %r', token.id)
861 conn = _Client(token.address, authkey=authkey)
862 dispatch(conn, None, 'decref', (token.id,))
863 except Exception as e:
864 util.debug('... decref failed %s', e)
865
866 else:
867 util.debug('DECREF %r -- manager already shutdown', token.id)
868
869 # check whether we can close this thread's connection because
870 # the process owns no more references to objects for this manager
871 if not idset and hasattr(tls, 'connection'):
872 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000873 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000874 tls.connection.close()
875 del tls.connection
876
877 def _after_fork(self):
878 self._manager = None
879 try:
880 self._incref()
881 except Exception as e:
882 # the proxy may just be for a manager which has shutdown
883 util.info('incref failed: %s' % e)
884
885 def __reduce__(self):
886 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500887 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 kwds['authkey'] = self._authkey
889
890 if getattr(self, '_isauto', False):
891 kwds['exposed'] = self._exposed_
892 return (RebuildProxy,
893 (AutoProxy, self._token, self._serializer, kwds))
894 else:
895 return (RebuildProxy,
896 (type(self), self._token, self._serializer, kwds))
897
898 def __deepcopy__(self, memo):
899 return self._getvalue()
900
901 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300902 return '<%s object, typeid %r at %#x>' % \
903 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904
905 def __str__(self):
906 '''
907 Return representation of the referent (or a fall-back if that fails)
908 '''
909 try:
910 return self._callmethod('__repr__')
911 except Exception:
912 return repr(self)[:-1] + "; '__str__()' failed>"
913
914#
915# Function used for unpickling
916#
917
918def RebuildProxy(func, token, serializer, kwds):
919 '''
920 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000921 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100922 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000923 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500924 util.debug('Rebuild a proxy owned by manager, token=%r', token)
925 kwds['manager_owned'] = True
926 if token.id not in server.id_to_local_proxy_obj:
927 server.id_to_local_proxy_obj[token.id] = \
928 server.id_to_obj[token.id]
929 incref = (
930 kwds.pop('incref', True) and
931 not getattr(process.current_process(), '_inheriting', False)
932 )
933 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000934
935#
936# Functions to create proxies and proxy types
937#
938
939def MakeProxyType(name, exposed, _cache={}):
940 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300941 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000942 '''
943 exposed = tuple(exposed)
944 try:
945 return _cache[(name, exposed)]
946 except KeyError:
947 pass
948
949 dic = {}
950
951 for meth in exposed:
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300952 exec('''def %s(self, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000953 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
954
955 ProxyType = type(name, (BaseProxy,), dic)
956 ProxyType._exposed_ = exposed
957 _cache[(name, exposed)] = ProxyType
958 return ProxyType
959
960
961def AutoProxy(token, serializer, manager=None, authkey=None,
962 exposed=None, incref=True):
963 '''
964 Return an auto-proxy for `token`
965 '''
966 _Client = listener_client[serializer][1]
967
968 if exposed is None:
969 conn = _Client(token.address, authkey=authkey)
970 try:
971 exposed = dispatch(conn, None, 'get_methods', (token,))
972 finally:
973 conn.close()
974
975 if authkey is None and manager is not None:
976 authkey = manager._authkey
977 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100978 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000979
980 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
981 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
982 incref=incref)
983 proxy._isauto = True
984 return proxy
985
986#
987# Types/callables which we will register with SyncManager
988#
989
990class Namespace(object):
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300991 def __init__(self, /, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000992 self.__dict__.update(kwds)
993 def __repr__(self):
994 items = list(self.__dict__.items())
995 temp = []
996 for name, value in items:
997 if not name.startswith('_'):
998 temp.append('%s=%r' % (name, value))
999 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +03001000 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001001
1002class Value(object):
1003 def __init__(self, typecode, value, lock=True):
1004 self._typecode = typecode
1005 self._value = value
1006 def get(self):
1007 return self._value
1008 def set(self, value):
1009 self._value = value
1010 def __repr__(self):
1011 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1012 value = property(get, set)
1013
1014def Array(typecode, sequence, lock=True):
1015 return array.array(typecode, sequence)
1016
1017#
1018# Proxy types used by SyncManager
1019#
1020
1021class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +00001022 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001023 def __iter__(self):
1024 return self
1025 def __next__(self, *args):
1026 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001027 def send(self, *args):
1028 return self._callmethod('send', args)
1029 def throw(self, *args):
1030 return self._callmethod('throw', args)
1031 def close(self, *args):
1032 return self._callmethod('close', args)
1033
1034
1035class AcquirerProxy(BaseProxy):
1036 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +01001037 def acquire(self, blocking=True, timeout=None):
1038 args = (blocking,) if timeout is None else (blocking, timeout)
1039 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001040 def release(self):
1041 return self._callmethod('release')
1042 def __enter__(self):
1043 return self._callmethod('acquire')
1044 def __exit__(self, exc_type, exc_val, exc_tb):
1045 return self._callmethod('release')
1046
1047
1048class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001049 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050 def wait(self, timeout=None):
1051 return self._callmethod('wait', (timeout,))
Antoine Pitrou48350412017-07-04 08:59:22 +02001052 def notify(self, n=1):
1053 return self._callmethod('notify', (n,))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001054 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001055 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001056 def wait_for(self, predicate, timeout=None):
1057 result = predicate()
1058 if result:
1059 return result
1060 if timeout is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001061 endtime = time.monotonic() + timeout
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001062 else:
1063 endtime = None
1064 waittime = None
1065 while not result:
1066 if endtime is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001067 waittime = endtime - time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001068 if waittime <= 0:
1069 break
1070 self.wait(waittime)
1071 result = predicate()
1072 return result
1073
Benjamin Petersone711caf2008-06-11 16:44:04 +00001074
1075class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001076 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001077 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001078 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001079 def set(self):
1080 return self._callmethod('set')
1081 def clear(self):
1082 return self._callmethod('clear')
1083 def wait(self, timeout=None):
1084 return self._callmethod('wait', (timeout,))
1085
Richard Oudkerk3730a172012-06-15 18:26:07 +01001086
1087class BarrierProxy(BaseProxy):
1088 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1089 def wait(self, timeout=None):
1090 return self._callmethod('wait', (timeout,))
1091 def abort(self):
1092 return self._callmethod('abort')
1093 def reset(self):
1094 return self._callmethod('reset')
1095 @property
1096 def parties(self):
1097 return self._callmethod('__getattribute__', ('parties',))
1098 @property
1099 def n_waiting(self):
1100 return self._callmethod('__getattribute__', ('n_waiting',))
1101 @property
1102 def broken(self):
1103 return self._callmethod('__getattribute__', ('broken',))
1104
1105
Benjamin Petersone711caf2008-06-11 16:44:04 +00001106class NamespaceProxy(BaseProxy):
1107 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1108 def __getattr__(self, key):
1109 if key[0] == '_':
1110 return object.__getattribute__(self, key)
1111 callmethod = object.__getattribute__(self, '_callmethod')
1112 return callmethod('__getattribute__', (key,))
1113 def __setattr__(self, key, value):
1114 if key[0] == '_':
1115 return object.__setattr__(self, key, value)
1116 callmethod = object.__getattribute__(self, '_callmethod')
1117 return callmethod('__setattr__', (key, value))
1118 def __delattr__(self, key):
1119 if key[0] == '_':
1120 return object.__delattr__(self, key)
1121 callmethod = object.__getattribute__(self, '_callmethod')
1122 return callmethod('__delattr__', (key,))
1123
1124
1125class ValueProxy(BaseProxy):
1126 _exposed_ = ('get', 'set')
1127 def get(self):
1128 return self._callmethod('get')
1129 def set(self, value):
1130 return self._callmethod('set', (value,))
1131 value = property(get, set)
1132
Batuhan Taşkaya03615562020-04-10 17:46:36 +03001133 __class_getitem__ = classmethod(types.GenericAlias)
1134
Benjamin Petersone711caf2008-06-11 16:44:04 +00001135
1136BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001137 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1138 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001139 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1140 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001141 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001142class ListProxy(BaseListProxy):
1143 def __iadd__(self, value):
1144 self._callmethod('extend', (value,))
1145 return self
1146 def __imul__(self, value):
1147 self._callmethod('__imul__', (value,))
1148 return self
1149
1150
1151DictProxy = MakeProxyType('DictProxy', (
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001152 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
Rémi Lapeyrea31f4cc2019-02-12 01:37:24 +01001153 '__setitem__', 'clear', 'copy', 'get', 'items',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001154 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1155 ))
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001156DictProxy._method_to_typeid_ = {
1157 '__iter__': 'Iterator',
1158 }
Benjamin Petersone711caf2008-06-11 16:44:04 +00001159
1160
1161ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001162 '__len__', '__getitem__', '__setitem__'
1163 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001164
1165
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001166BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001167 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001168 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001169 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001170BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001171 'apply_async': 'AsyncResult',
1172 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001173 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001174 'imap': 'Iterator',
1175 'imap_unordered': 'Iterator'
1176 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001177class PoolProxy(BasePoolProxy):
1178 def __enter__(self):
1179 return self
1180 def __exit__(self, exc_type, exc_val, exc_tb):
1181 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001182
1183#
1184# Definition of SyncManager
1185#
1186
1187class SyncManager(BaseManager):
1188 '''
1189 Subclass of `BaseManager` which supports a number of shared object types.
1190
1191 The types registered are those intended for the synchronization
1192 of threads, plus `dict`, `list` and `Namespace`.
1193
1194 The `multiprocessing.Manager()` function creates started instances of
1195 this class.
1196 '''
1197
1198SyncManager.register('Queue', queue.Queue)
1199SyncManager.register('JoinableQueue', queue.Queue)
1200SyncManager.register('Event', threading.Event, EventProxy)
1201SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1202SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1203SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1204SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1205 AcquirerProxy)
1206SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001207SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001208SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001209SyncManager.register('list', list, ListProxy)
1210SyncManager.register('dict', dict, DictProxy)
1211SyncManager.register('Value', Value, ValueProxy)
1212SyncManager.register('Array', Array, ArrayProxy)
1213SyncManager.register('Namespace', Namespace, NamespaceProxy)
1214
1215# types returned by methods of PoolProxy
1216SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1217SyncManager.register('AsyncResult', create_method=False)
Davin Pottse895de32019-02-23 22:08:16 -06001218
1219#
1220# Definition of SharedMemoryManager and SharedMemoryServer
1221#
1222
1223if HAS_SHMEM:
1224 class _SharedMemoryTracker:
1225 "Manages one or more shared memory segments."
1226
1227 def __init__(self, name, segment_names=[]):
1228 self.shared_memory_context_name = name
1229 self.segment_names = segment_names
1230
1231 def register_segment(self, segment_name):
1232 "Adds the supplied shared memory block name to tracker."
1233 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1234 self.segment_names.append(segment_name)
1235
1236 def destroy_segment(self, segment_name):
1237 """Calls unlink() on the shared memory block with the supplied name
1238 and removes it from the list of blocks being tracked."""
1239 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1240 self.segment_names.remove(segment_name)
1241 segment = shared_memory.SharedMemory(segment_name)
1242 segment.close()
1243 segment.unlink()
1244
1245 def unlink(self):
1246 "Calls destroy_segment() on all tracked shared memory blocks."
1247 for segment_name in self.segment_names[:]:
1248 self.destroy_segment(segment_name)
1249
1250 def __del__(self):
1251 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1252 self.unlink()
1253
1254 def __getstate__(self):
1255 return (self.shared_memory_context_name, self.segment_names)
1256
1257 def __setstate__(self, state):
1258 self.__init__(*state)
1259
1260
1261 class SharedMemoryServer(Server):
1262
1263 public = Server.public + \
1264 ['track_segment', 'release_segment', 'list_segments']
1265
1266 def __init__(self, *args, **kwargs):
1267 Server.__init__(self, *args, **kwargs)
Pablo Galindo6012f302020-03-09 13:48:01 +00001268 address = self.address
1269 # The address of Linux abstract namespaces can be bytes
1270 if isinstance(address, bytes):
1271 address = os.fsdecode(address)
Davin Pottse895de32019-02-23 22:08:16 -06001272 self.shared_memory_context = \
Pablo Galindo6012f302020-03-09 13:48:01 +00001273 _SharedMemoryTracker(f"shm_{address}_{getpid()}")
Davin Pottse895de32019-02-23 22:08:16 -06001274 util.debug(f"SharedMemoryServer started by pid {getpid()}")
1275
Serhiy Storchaka142566c2019-06-05 18:22:31 +03001276 def create(self, c, typeid, /, *args, **kwargs):
Davin Pottse895de32019-02-23 22:08:16 -06001277 """Create a new distributed-shared object (not backed by a shared
1278 memory block) and return its id to be used in a Proxy Object."""
1279 # Unless set up as a shared proxy, don't make shared_memory_context
1280 # a standard part of kwargs. This makes things easier for supplying
1281 # simple functions.
1282 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1283 kwargs['shared_memory_context'] = self.shared_memory_context
Serhiy Storchaka142566c2019-06-05 18:22:31 +03001284 return Server.create(self, c, typeid, *args, **kwargs)
Davin Pottse895de32019-02-23 22:08:16 -06001285
1286 def shutdown(self, c):
1287 "Call unlink() on all tracked shared memory, terminate the Server."
1288 self.shared_memory_context.unlink()
1289 return Server.shutdown(self, c)
1290
1291 def track_segment(self, c, segment_name):
1292 "Adds the supplied shared memory block name to Server's tracker."
1293 self.shared_memory_context.register_segment(segment_name)
1294
1295 def release_segment(self, c, segment_name):
1296 """Calls unlink() on the shared memory block with the supplied name
1297 and removes it from the tracker instance inside the Server."""
1298 self.shared_memory_context.destroy_segment(segment_name)
1299
1300 def list_segments(self, c):
1301 """Returns a list of names of shared memory blocks that the Server
1302 is currently tracking."""
1303 return self.shared_memory_context.segment_names
1304
1305
1306 class SharedMemoryManager(BaseManager):
1307 """Like SyncManager but uses SharedMemoryServer instead of Server.
1308
1309 It provides methods for creating and returning SharedMemory instances
1310 and for creating a list-like object (ShareableList) backed by shared
1311 memory. It also provides methods that create and return Proxy Objects
1312 that support synchronization across processes (i.e. multi-process-safe
1313 locks and semaphores).
1314 """
1315
1316 _Server = SharedMemoryServer
1317
1318 def __init__(self, *args, **kwargs):
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02001319 if os.name == "posix":
1320 # bpo-36867: Ensure the resource_tracker is running before
1321 # launching the manager process, so that concurrent
1322 # shared_memory manipulation both in the manager and in the
1323 # current process does not create two resource_tracker
1324 # processes.
1325 from . import resource_tracker
1326 resource_tracker.ensure_running()
Davin Pottse895de32019-02-23 22:08:16 -06001327 BaseManager.__init__(self, *args, **kwargs)
1328 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1329
1330 def __del__(self):
1331 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1332 pass
1333
1334 def get_server(self):
1335 'Better than monkeypatching for now; merge into Server ultimately'
1336 if self._state.value != State.INITIAL:
1337 if self._state.value == State.STARTED:
1338 raise ProcessError("Already started SharedMemoryServer")
1339 elif self._state.value == State.SHUTDOWN:
1340 raise ProcessError("SharedMemoryManager has shut down")
1341 else:
1342 raise ProcessError(
1343 "Unknown state {!r}".format(self._state.value))
1344 return self._Server(self._registry, self._address,
1345 self._authkey, self._serializer)
1346
1347 def SharedMemory(self, size):
1348 """Returns a new SharedMemory instance with the specified size in
1349 bytes, to be tracked by the manager."""
1350 with self._Client(self._address, authkey=self._authkey) as conn:
1351 sms = shared_memory.SharedMemory(None, create=True, size=size)
1352 try:
1353 dispatch(conn, None, 'track_segment', (sms.name,))
1354 except BaseException as e:
1355 sms.unlink()
1356 raise e
1357 return sms
1358
1359 def ShareableList(self, sequence):
1360 """Returns a new ShareableList instance populated with the values
1361 from the input sequence, to be tracked by the manager."""
1362 with self._Client(self._address, authkey=self._authkey) as conn:
1363 sl = shared_memory.ShareableList(sequence)
1364 try:
1365 dispatch(conn, None, 'track_segment', (sl.shm.name,))
1366 except BaseException as e:
1367 sl.shm.unlink()
1368 raise e
1369 return sl