blob: b981b0e1cb8ed80a71a8828c3f52574a05d60303 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
Davin Pottse895de32019-02-23 22:08:16 -06002# Module providing manager classes for dealing
Benjamin Petersone711caf2008-06-11 16:44:04 +00003# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
Davin Pottse895de32019-02-23 22:08:16 -060011__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
12 'SharedMemoryManager' ]
Benjamin Petersone711caf2008-06-11 16:44:04 +000013
14#
15# Imports
16#
17
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000019import threading
Pierre Glaserd0d64ad2019-05-10 20:42:35 +020020import signal
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000022import queue
Victor Stinnerc2368cb2018-07-06 13:51:52 +020023import time
Batuhan Taşkaya03615562020-04-10 17:46:36 +030024import types
Pierre Glaserb1dfcad2019-05-13 21:15:32 +020025import os
Davin Pottse895de32019-02-23 22:08:16 -060026from os import getpid
Benjamin Petersone711caf2008-06-11 16:44:04 +000027
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010028from traceback import format_exc
29
30from . import connection
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -050031from .context import reduction, get_spawning_popen, ProcessError
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010032from . import pool
33from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010034from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010035from . import get_context
Davin Pottse895de32019-02-23 22:08:16 -060036try:
37 from . import shared_memory
38 HAS_SHMEM = True
39except ImportError:
40 HAS_SHMEM = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
Benjamin Petersone711caf2008-06-11 16:44:04 +000042#
Benjamin Petersone711caf2008-06-11 16:44:04 +000043# Register some things for pickling
44#
45
46def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000047 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010048reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
50view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000051if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000052 def rebuild_as_list(obj):
53 return list, (list(obj),)
54 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010055 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000056
57#
58# Type for identifying shared objects
59#
60
61class Token(object):
62 '''
Galdenc6066242020-04-18 14:58:29 +080063 Type to uniquely identify a shared object
Benjamin Petersone711caf2008-06-11 16:44:04 +000064 '''
65 __slots__ = ('typeid', 'address', 'id')
66
67 def __init__(self, typeid, address, id):
68 (self.typeid, self.address, self.id) = (typeid, address, id)
69
70 def __getstate__(self):
71 return (self.typeid, self.address, self.id)
72
73 def __setstate__(self, state):
74 (self.typeid, self.address, self.id) = state
75
76 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030077 return '%s(typeid=%r, address=%r, id=%r)' % \
78 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000079
80#
81# Function for communication with a manager's server process
82#
83
84def dispatch(c, id, methodname, args=(), kwds={}):
85 '''
86 Send a message to manager using connection `c` and return response
87 '''
88 c.send((id, methodname, args, kwds))
89 kind, result = c.recv()
90 if kind == '#RETURN':
91 return result
92 raise convert_to_error(kind, result)
93
94def convert_to_error(kind, result):
95 if kind == '#ERROR':
96 return result
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050097 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
98 if not isinstance(result, str):
99 raise TypeError(
100 "Result {0!r} (kind '{1}') type is {2}, not str".format(
101 result, kind, type(result)))
102 if kind == '#UNSERIALIZABLE':
103 return RemoteError('Unserializable message: %s\n' % result)
104 else:
105 return RemoteError(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000106 else:
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -0500107 return ValueError('Unrecognized message type {!r}'.format(kind))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000108
109class RemoteError(Exception):
110 def __str__(self):
111 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
112
113#
114# Functions for finding the method names of an object
115#
116
117def all_methods(obj):
118 '''
119 Return a list of names of methods of `obj`
120 '''
121 temp = []
122 for name in dir(obj):
123 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200124 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000125 temp.append(name)
126 return temp
127
128def public_methods(obj):
129 '''
130 Return a list of names of methods of `obj` which do not start with '_'
131 '''
132 return [name for name in all_methods(obj) if name[0] != '_']
133
134#
135# Server which is run in a process controlled by a manager
136#
137
138class Server(object):
139 '''
140 Server class which runs in a process controlled by a manager object
141 '''
142 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
143 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
144
145 def __init__(self, registry, address, authkey, serializer):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500146 if not isinstance(authkey, bytes):
147 raise TypeError(
148 "Authkey {0!r} is type {1!s}, not bytes".format(
149 authkey, type(authkey)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000150 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100151 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152 Listener, Client = listener_client[serializer]
153
154 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100155 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156 self.address = self.listener.address
157
Jesse Noller63b3a972009-01-21 02:15:48 +0000158 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500160 self.id_to_local_proxy_obj = {}
161 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162
163 def serve_forever(self):
164 '''
165 Run the server forever
166 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100167 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100168 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100170 accepter = threading.Thread(target=self.accepter)
171 accepter.daemon = True
172 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100174 while not self.stop_event.is_set():
175 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000176 except (KeyboardInterrupt, SystemExit):
177 pass
178 finally:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500179 if sys.stdout != sys.__stdout__: # what about stderr?
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100180 util.debug('resetting stdout, stderr')
181 sys.stdout = sys.__stdout__
182 sys.stderr = sys.__stderr__
183 sys.exit(0)
184
185 def accepter(self):
186 while True:
187 try:
188 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200189 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100190 continue
191 t = threading.Thread(target=self.handle_request, args=(c,))
192 t.daemon = True
193 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000194
Victor Stinner7c29ae12021-04-16 19:42:34 +0200195 def _handle_request(self, c):
196 request = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 try:
198 connection.deliver_challenge(c, self.authkey)
199 connection.answer_challenge(c, self.authkey)
200 request = c.recv()
201 ignore, funcname, args, kwds = request
202 assert funcname in self.public, '%r unrecognized' % funcname
203 func = getattr(self, funcname)
204 except Exception:
205 msg = ('#TRACEBACK', format_exc())
206 else:
207 try:
208 result = func(c, *args, **kwds)
209 except Exception:
210 msg = ('#TRACEBACK', format_exc())
211 else:
212 msg = ('#RETURN', result)
Victor Stinner7c29ae12021-04-16 19:42:34 +0200213
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214 try:
215 c.send(msg)
216 except Exception as e:
217 try:
218 c.send(('#TRACEBACK', format_exc()))
219 except Exception:
220 pass
221 util.info('Failure to send message: %r', msg)
222 util.info(' ... request was %r', request)
223 util.info(' ... exception was %r', e)
224
Victor Stinner7c29ae12021-04-16 19:42:34 +0200225 def handle_request(self, conn):
226 '''
227 Handle a new connection
228 '''
229 try:
230 self._handle_request(conn)
231 except SystemExit:
232 # Server.serve_client() calls sys.exit(0) on EOF
233 pass
234 finally:
235 conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000236
237 def serve_client(self, conn):
238 '''
239 Handle requests from the proxies in a particular process/thread
240 '''
241 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000242 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000243
244 recv = conn.recv
245 send = conn.send
246 id_to_obj = self.id_to_obj
247
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100248 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249
250 try:
251 methodname = obj = None
252 request = recv()
253 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500254 try:
255 obj, exposed, gettypeid = id_to_obj[ident]
256 except KeyError as ke:
257 try:
258 obj, exposed, gettypeid = \
259 self.id_to_local_proxy_obj[ident]
Pablo Galindo293dd232019-11-19 21:34:03 +0000260 except KeyError:
Davin Potts86a76682016-09-07 18:48:01 -0500261 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262
263 if methodname not in exposed:
264 raise AttributeError(
265 'method %r of %r object is not in exposed=%r' %
266 (methodname, type(obj), exposed)
267 )
268
269 function = getattr(obj, methodname)
270
271 try:
272 res = function(*args, **kwds)
273 except Exception as e:
274 msg = ('#ERROR', e)
275 else:
276 typeid = gettypeid and gettypeid.get(methodname, None)
277 if typeid:
278 rident, rexposed = self.create(conn, typeid, res)
279 token = Token(typeid, self.address, rident)
280 msg = ('#PROXY', (rexposed, token))
281 else:
282 msg = ('#RETURN', res)
283
284 except AttributeError:
285 if methodname is None:
286 msg = ('#TRACEBACK', format_exc())
287 else:
288 try:
289 fallback_func = self.fallback_mapping[methodname]
290 result = fallback_func(
291 self, conn, ident, obj, *args, **kwds
292 )
293 msg = ('#RETURN', result)
294 except Exception:
295 msg = ('#TRACEBACK', format_exc())
296
297 except EOFError:
298 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000299 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000300 sys.exit(0)
301
302 except Exception:
303 msg = ('#TRACEBACK', format_exc())
304
305 try:
306 try:
307 send(msg)
Pablo Galindo293dd232019-11-19 21:34:03 +0000308 except Exception:
Davin Potts37156a72016-09-08 14:40:36 -0500309 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000310 except Exception as e:
311 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000312 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000313 util.info(' ... message was %r', msg)
314 util.info(' ... exception was %r', e)
315 conn.close()
316 sys.exit(1)
317
318 def fallback_getvalue(self, conn, ident, obj):
319 return obj
320
321 def fallback_str(self, conn, ident, obj):
322 return str(obj)
323
324 def fallback_repr(self, conn, ident, obj):
325 return repr(obj)
326
327 fallback_mapping = {
328 '__str__':fallback_str,
329 '__repr__':fallback_repr,
330 '#GETVALUE':fallback_getvalue
331 }
332
333 def dummy(self, c):
334 pass
335
336 def debug_info(self, c):
337 '''
338 Return some info --- useful to spot problems with refcounting
339 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500340 # Perhaps include debug info about 'c'?
Charles-François Natalia924fc72014-05-25 14:12:12 +0100341 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000342 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500343 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000344 keys.sort()
345 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000346 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000347 result.append(' %s: refcount=%s\n %s' %
348 (ident, self.id_to_refcount[ident],
349 str(self.id_to_obj[ident][0])[:75]))
350 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000351
352 def number_of_objects(self, c):
353 '''
354 Number of shared objects
355 '''
Davin Potts86a76682016-09-07 18:48:01 -0500356 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
357 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000358
359 def shutdown(self, c):
360 '''
361 Shutdown this process
362 '''
363 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100364 util.debug('manager received shutdown message')
365 c.send(('#RETURN', None))
366 except:
367 import traceback
368 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000369 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100370 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000371
Serhiy Storchaka142566c2019-06-05 18:22:31 +0300372 def create(self, c, typeid, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000373 '''
374 Create a new shared object and return its id
375 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100376 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000377 callable, exposed, method_to_typeid, proxytype = \
378 self.registry[typeid]
379
380 if callable is None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500381 if kwds or (len(args) != 1):
382 raise ValueError(
383 "Without callable, must have one non-keyword argument")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000384 obj = args[0]
385 else:
386 obj = callable(*args, **kwds)
387
388 if exposed is None:
389 exposed = public_methods(obj)
390 if method_to_typeid is not None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500391 if not isinstance(method_to_typeid, dict):
392 raise TypeError(
393 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
394 method_to_typeid, type(method_to_typeid)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000395 exposed = list(exposed) + list(method_to_typeid)
396
397 ident = '%x' % id(obj) # convert to string because xmlrpclib
398 # only has 32 bit signed integers
399 util.debug('%r callable returned object with id %r', typeid, ident)
400
401 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
402 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000403 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500404
405 self.incref(c, ident)
406 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000407
408 def get_methods(self, c, token):
409 '''
410 Return the methods of the shared object indicated by token
411 '''
412 return tuple(self.id_to_obj[token.id][1])
413
414 def accept_connection(self, c, name):
415 '''
416 Spawn a new thread to serve this connection
417 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000418 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000419 c.send(('#RETURN', None))
420 self.serve_client(c)
421
422 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100423 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500424 try:
425 self.id_to_refcount[ident] += 1
426 except KeyError as ke:
427 # If no external references exist but an internal (to the
428 # manager) still does and a new external reference is created
429 # from it, restore the manager's tracking of it from the
430 # previously stashed internal ref.
431 if ident in self.id_to_local_proxy_obj:
432 self.id_to_refcount[ident] = 1
433 self.id_to_obj[ident] = \
434 self.id_to_local_proxy_obj[ident]
435 obj, exposed, gettypeid = self.id_to_obj[ident]
436 util.debug('Server re-enabled tracking & INCREF %r', ident)
437 else:
438 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000439
440 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500441 if ident not in self.id_to_refcount and \
442 ident in self.id_to_local_proxy_obj:
443 util.debug('Server DECREF skipping %r', ident)
444 return
445
Charles-François Natalia924fc72014-05-25 14:12:12 +0100446 with self.mutex:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500447 if self.id_to_refcount[ident] <= 0:
448 raise AssertionError(
449 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
450 ident, self.id_to_obj[ident],
451 self.id_to_refcount[ident]))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000452 self.id_to_refcount[ident] -= 1
453 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500454 del self.id_to_refcount[ident]
455
456 if ident not in self.id_to_refcount:
457 # Two-step process in case the object turns out to contain other
458 # proxy objects (e.g. a managed list of managed lists).
459 # Otherwise, deleting self.id_to_obj[ident] would trigger the
460 # deleting of the stored value (another managed object) which would
461 # in turn attempt to acquire the mutex that is already held here.
462 self.id_to_obj[ident] = (None, (), None) # thread-safe
463 util.debug('disposing of obj with id %r', ident)
464 with self.mutex:
465 del self.id_to_obj[ident]
466
Benjamin Petersone711caf2008-06-11 16:44:04 +0000467
468#
469# Class to represent state of a manager
470#
471
472class State(object):
473 __slots__ = ['value']
474 INITIAL = 0
475 STARTED = 1
476 SHUTDOWN = 2
477
478#
479# Mapping from serializer name to Listener and Client types
480#
481
482listener_client = {
483 'pickle' : (connection.Listener, connection.Client),
484 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
485 }
486
487#
488# Definition of BaseManager
489#
490
491class BaseManager(object):
492 '''
493 Base class for managers
494 '''
495 _registry = {}
496 _Server = Server
497
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100498 def __init__(self, address=None, authkey=None, serializer='pickle',
499 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000500 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100501 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000502 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100503 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000504 self._state = State()
505 self._state.value = State.INITIAL
506 self._serializer = serializer
507 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100508 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000509
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510 def get_server(self):
511 '''
512 Return server object with serve_forever() method and address attribute
513 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500514 if self._state.value != State.INITIAL:
515 if self._state.value == State.STARTED:
516 raise ProcessError("Already started server")
517 elif self._state.value == State.SHUTDOWN:
518 raise ProcessError("Manager has shut down")
519 else:
520 raise ProcessError(
521 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000522 return Server(self._registry, self._address,
523 self._authkey, self._serializer)
524
525 def connect(self):
526 '''
527 Connect manager object to the server process
528 '''
529 Listener, Client = listener_client[self._serializer]
530 conn = Client(self._address, authkey=self._authkey)
531 dispatch(conn, None, 'dummy')
532 self._state.value = State.STARTED
533
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000534 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000535 '''
536 Spawn a server process for this manager object
537 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500538 if self._state.value != State.INITIAL:
539 if self._state.value == State.STARTED:
540 raise ProcessError("Already started server")
541 elif self._state.value == State.SHUTDOWN:
542 raise ProcessError("Manager has shut down")
543 else:
544 raise ProcessError(
545 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000546
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200547 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000548 raise TypeError('initializer must be a callable')
549
Benjamin Petersone711caf2008-06-11 16:44:04 +0000550 # pipe over which we will retrieve address of server
551 reader, writer = connection.Pipe(duplex=False)
552
553 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100554 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000555 target=type(self)._run_server,
556 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000557 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000558 )
559 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000560 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000561 self._process.start()
562
563 # get address of server
564 writer.close()
565 self._address = reader.recv()
566 reader.close()
567
568 # register a finalizer
569 self._state.value = State.STARTED
570 self.shutdown = util.Finalize(
571 self, type(self)._finalize_manager,
572 args=(self._process, self._address, self._authkey,
573 self._state, self._Client),
574 exitpriority=0
575 )
576
577 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000578 def _run_server(cls, registry, address, authkey, serializer, writer,
579 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000580 '''
581 Create a server, report its address and run it
582 '''
Pierre Glaserd0d64ad2019-05-10 20:42:35 +0200583 # bpo-36368: protect server process from KeyboardInterrupt signals
584 signal.signal(signal.SIGINT, signal.SIG_IGN)
585
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000586 if initializer is not None:
587 initializer(*initargs)
588
Benjamin Petersone711caf2008-06-11 16:44:04 +0000589 # create server
590 server = cls._Server(registry, address, authkey, serializer)
591
592 # inform parent process of the server's address
593 writer.send(server.address)
594 writer.close()
595
596 # run the manager
597 util.info('manager serving at %r', server.address)
598 server.serve_forever()
599
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300600 def _create(self, typeid, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000601 '''
602 Create a new shared object; return the token and exposed tuple
603 '''
604 assert self._state.value == State.STARTED, 'server not yet started'
605 conn = self._Client(self._address, authkey=self._authkey)
606 try:
607 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
608 finally:
609 conn.close()
610 return Token(typeid, self._address, id), exposed
611
612 def join(self, timeout=None):
613 '''
614 Join the manager process (if it has been spawned)
615 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100616 if self._process is not None:
617 self._process.join(timeout)
618 if not self._process.is_alive():
619 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000620
621 def _debug_info(self):
622 '''
623 Return some info about the servers shared objects and connections
624 '''
625 conn = self._Client(self._address, authkey=self._authkey)
626 try:
627 return dispatch(conn, None, 'debug_info')
628 finally:
629 conn.close()
630
631 def _number_of_objects(self):
632 '''
633 Return the number of shared objects
634 '''
635 conn = self._Client(self._address, authkey=self._authkey)
636 try:
637 return dispatch(conn, None, 'number_of_objects')
638 finally:
639 conn.close()
640
641 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100642 if self._state.value == State.INITIAL:
643 self.start()
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500644 if self._state.value != State.STARTED:
645 if self._state.value == State.INITIAL:
646 raise ProcessError("Unable to start server")
647 elif self._state.value == State.SHUTDOWN:
648 raise ProcessError("Manager has shut down")
649 else:
650 raise ProcessError(
651 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000652 return self
653
654 def __exit__(self, exc_type, exc_val, exc_tb):
655 self.shutdown()
656
657 @staticmethod
658 def _finalize_manager(process, address, authkey, state, _Client):
659 '''
660 Shutdown the manager process; will be registered as a finalizer
661 '''
662 if process.is_alive():
663 util.info('sending shutdown message to manager')
664 try:
665 conn = _Client(address, authkey=authkey)
666 try:
667 dispatch(conn, None, 'shutdown')
668 finally:
669 conn.close()
670 except Exception:
671 pass
672
Richard Oudkerk3049f122012-06-15 20:08:29 +0100673 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 if process.is_alive():
675 util.info('manager still alive')
676 if hasattr(process, 'terminate'):
677 util.info('trying to `terminate()` manager process')
678 process.terminate()
679 process.join(timeout=0.1)
680 if process.is_alive():
681 util.info('manager still alive after terminate')
682
683 state.value = State.SHUTDOWN
684 try:
685 del BaseProxy._address_to_local[address]
686 except KeyError:
687 pass
688
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200689 @property
690 def address(self):
691 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000692
693 @classmethod
694 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
695 method_to_typeid=None, create_method=True):
696 '''
697 Register a typeid with the manager type
698 '''
699 if '_registry' not in cls.__dict__:
700 cls._registry = cls._registry.copy()
701
702 if proxytype is None:
703 proxytype = AutoProxy
704
705 exposed = exposed or getattr(proxytype, '_exposed_', None)
706
707 method_to_typeid = method_to_typeid or \
708 getattr(proxytype, '_method_to_typeid_', None)
709
710 if method_to_typeid:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500711 for key, value in list(method_to_typeid.items()): # isinstance?
Benjamin Petersone711caf2008-06-11 16:44:04 +0000712 assert type(key) is str, '%r is not a string' % key
713 assert type(value) is str, '%r is not a string' % value
714
715 cls._registry[typeid] = (
716 callable, exposed, method_to_typeid, proxytype
717 )
718
719 if create_method:
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300720 def temp(self, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000721 util.debug('requesting creation of a shared %r object', typeid)
722 token, exp = self._create(typeid, *args, **kwds)
723 proxy = proxytype(
724 token, self._serializer, manager=self,
725 authkey=self._authkey, exposed=exp
726 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000727 conn = self._Client(token.address, authkey=self._authkey)
728 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000729 return proxy
730 temp.__name__ = typeid
731 setattr(cls, typeid, temp)
732
733#
734# Subclass of set which get cleared after a fork
735#
736
737class ProcessLocalSet(set):
738 def __init__(self):
739 util.register_after_fork(self, lambda obj: obj.clear())
740 def __reduce__(self):
741 return type(self), ()
742
743#
744# Definition of BaseProxy
745#
746
747class BaseProxy(object):
748 '''
749 A base for proxies of shared objects
750 '''
751 _address_to_local = {}
752 _mutex = util.ForkAwareThreadLock()
753
754 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500755 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100756 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000757 tls_idset = BaseProxy._address_to_local.get(token.address, None)
758 if tls_idset is None:
759 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
760 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000761
762 # self._tls is used to record the connection used by this
763 # thread to communicate with the manager at token.address
764 self._tls = tls_idset[0]
765
766 # self._idset is used to record the identities of all shared
767 # objects for which the current process owns references and
768 # which are in the manager at token.address
769 self._idset = tls_idset[1]
770
771 self._token = token
772 self._id = self._token.id
773 self._manager = manager
774 self._serializer = serializer
775 self._Client = listener_client[serializer][1]
776
Davin Potts86a76682016-09-07 18:48:01 -0500777 # Should be set to True only when a proxy object is being created
778 # on the manager server; primary use case: nested proxy objects.
779 # RebuildProxy detects when a proxy is being created on the manager
780 # and sets this value appropriately.
781 self._owned_by_manager = manager_owned
782
Benjamin Petersone711caf2008-06-11 16:44:04 +0000783 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100784 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 elif self._manager is not None:
786 self._authkey = self._manager._authkey
787 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100788 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789
790 if incref:
791 self._incref()
792
793 util.register_after_fork(self, BaseProxy._after_fork)
794
795 def _connect(self):
796 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100797 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000798 if threading.current_thread().name != 'MainThread':
799 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000800 conn = self._Client(self._token.address, authkey=self._authkey)
801 dispatch(conn, None, 'accept_connection', (name,))
802 self._tls.connection = conn
803
804 def _callmethod(self, methodname, args=(), kwds={}):
805 '''
Galdenc6066242020-04-18 14:58:29 +0800806 Try to call a method of the referent and return a copy of the result
Benjamin Petersone711caf2008-06-11 16:44:04 +0000807 '''
808 try:
809 conn = self._tls.connection
810 except AttributeError:
811 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000812 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000813 self._connect()
814 conn = self._tls.connection
815
816 conn.send((self._id, methodname, args, kwds))
817 kind, result = conn.recv()
818
819 if kind == '#RETURN':
820 return result
821 elif kind == '#PROXY':
822 exposed, token = result
823 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100824 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000825 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000826 token, self._serializer, manager=self._manager,
827 authkey=self._authkey, exposed=exposed
828 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000829 conn = self._Client(token.address, authkey=self._authkey)
830 dispatch(conn, None, 'decref', (token.id,))
831 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000832 raise convert_to_error(kind, result)
833
834 def _getvalue(self):
835 '''
836 Get a copy of the value of the referent
837 '''
838 return self._callmethod('#GETVALUE')
839
840 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500841 if self._owned_by_manager:
842 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
843 return
844
Benjamin Petersone711caf2008-06-11 16:44:04 +0000845 conn = self._Client(self._token.address, authkey=self._authkey)
846 dispatch(conn, None, 'incref', (self._id,))
847 util.debug('INCREF %r', self._token.id)
848
849 self._idset.add(self._id)
850
851 state = self._manager and self._manager._state
852
853 self._close = util.Finalize(
854 self, BaseProxy._decref,
855 args=(self._token, self._authkey, state,
856 self._tls, self._idset, self._Client),
857 exitpriority=10
858 )
859
860 @staticmethod
861 def _decref(token, authkey, state, tls, idset, _Client):
862 idset.discard(token.id)
863
864 # check whether manager is still alive
865 if state is None or state.value == State.STARTED:
866 # tell manager this process no longer cares about referent
867 try:
868 util.debug('DECREF %r', token.id)
869 conn = _Client(token.address, authkey=authkey)
870 dispatch(conn, None, 'decref', (token.id,))
871 except Exception as e:
872 util.debug('... decref failed %s', e)
873
874 else:
875 util.debug('DECREF %r -- manager already shutdown', token.id)
876
877 # check whether we can close this thread's connection because
878 # the process owns no more references to objects for this manager
879 if not idset and hasattr(tls, 'connection'):
880 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000881 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000882 tls.connection.close()
883 del tls.connection
884
885 def _after_fork(self):
886 self._manager = None
887 try:
888 self._incref()
889 except Exception as e:
890 # the proxy may just be for a manager which has shutdown
891 util.info('incref failed: %s' % e)
892
893 def __reduce__(self):
894 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500895 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000896 kwds['authkey'] = self._authkey
897
898 if getattr(self, '_isauto', False):
899 kwds['exposed'] = self._exposed_
900 return (RebuildProxy,
901 (AutoProxy, self._token, self._serializer, kwds))
902 else:
903 return (RebuildProxy,
904 (type(self), self._token, self._serializer, kwds))
905
906 def __deepcopy__(self, memo):
907 return self._getvalue()
908
909 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300910 return '<%s object, typeid %r at %#x>' % \
911 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000912
913 def __str__(self):
914 '''
915 Return representation of the referent (or a fall-back if that fails)
916 '''
917 try:
918 return self._callmethod('__repr__')
919 except Exception:
920 return repr(self)[:-1] + "; '__str__()' failed>"
921
922#
923# Function used for unpickling
924#
925
926def RebuildProxy(func, token, serializer, kwds):
927 '''
928 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000929 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100930 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000931 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500932 util.debug('Rebuild a proxy owned by manager, token=%r', token)
933 kwds['manager_owned'] = True
934 if token.id not in server.id_to_local_proxy_obj:
935 server.id_to_local_proxy_obj[token.id] = \
936 server.id_to_obj[token.id]
937 incref = (
938 kwds.pop('incref', True) and
939 not getattr(process.current_process(), '_inheriting', False)
940 )
941 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000942
943#
944# Functions to create proxies and proxy types
945#
946
947def MakeProxyType(name, exposed, _cache={}):
948 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300949 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000950 '''
951 exposed = tuple(exposed)
952 try:
953 return _cache[(name, exposed)]
954 except KeyError:
955 pass
956
957 dic = {}
958
959 for meth in exposed:
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300960 exec('''def %s(self, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000961 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
962
963 ProxyType = type(name, (BaseProxy,), dic)
964 ProxyType._exposed_ = exposed
965 _cache[(name, exposed)] = ProxyType
966 return ProxyType
967
968
969def AutoProxy(token, serializer, manager=None, authkey=None,
Miss Islington (bot)3ec3e0f2021-07-01 21:15:47 -0700970 exposed=None, incref=True, manager_owned=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 '''
972 Return an auto-proxy for `token`
973 '''
974 _Client = listener_client[serializer][1]
975
976 if exposed is None:
977 conn = _Client(token.address, authkey=authkey)
978 try:
979 exposed = dispatch(conn, None, 'get_methods', (token,))
980 finally:
981 conn.close()
982
983 if authkey is None and manager is not None:
984 authkey = manager._authkey
985 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100986 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000987
988 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
989 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
Miss Islington (bot)3ec3e0f2021-07-01 21:15:47 -0700990 incref=incref, manager_owned=manager_owned)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000991 proxy._isauto = True
992 return proxy
993
994#
995# Types/callables which we will register with SyncManager
996#
997
998class Namespace(object):
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300999 def __init__(self, /, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001000 self.__dict__.update(kwds)
1001 def __repr__(self):
1002 items = list(self.__dict__.items())
1003 temp = []
1004 for name, value in items:
1005 if not name.startswith('_'):
1006 temp.append('%s=%r' % (name, value))
1007 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +03001008 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001009
1010class Value(object):
1011 def __init__(self, typecode, value, lock=True):
1012 self._typecode = typecode
1013 self._value = value
1014 def get(self):
1015 return self._value
1016 def set(self, value):
1017 self._value = value
1018 def __repr__(self):
1019 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1020 value = property(get, set)
1021
1022def Array(typecode, sequence, lock=True):
1023 return array.array(typecode, sequence)
1024
1025#
1026# Proxy types used by SyncManager
1027#
1028
1029class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +00001030 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001031 def __iter__(self):
1032 return self
1033 def __next__(self, *args):
1034 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001035 def send(self, *args):
1036 return self._callmethod('send', args)
1037 def throw(self, *args):
1038 return self._callmethod('throw', args)
1039 def close(self, *args):
1040 return self._callmethod('close', args)
1041
1042
1043class AcquirerProxy(BaseProxy):
1044 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +01001045 def acquire(self, blocking=True, timeout=None):
1046 args = (blocking,) if timeout is None else (blocking, timeout)
1047 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 def release(self):
1049 return self._callmethod('release')
1050 def __enter__(self):
1051 return self._callmethod('acquire')
1052 def __exit__(self, exc_type, exc_val, exc_tb):
1053 return self._callmethod('release')
1054
1055
1056class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001057 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058 def wait(self, timeout=None):
1059 return self._callmethod('wait', (timeout,))
Antoine Pitrou48350412017-07-04 08:59:22 +02001060 def notify(self, n=1):
1061 return self._callmethod('notify', (n,))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001062 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001063 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001064 def wait_for(self, predicate, timeout=None):
1065 result = predicate()
1066 if result:
1067 return result
1068 if timeout is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001069 endtime = time.monotonic() + timeout
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001070 else:
1071 endtime = None
1072 waittime = None
1073 while not result:
1074 if endtime is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001075 waittime = endtime - time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001076 if waittime <= 0:
1077 break
1078 self.wait(waittime)
1079 result = predicate()
1080 return result
1081
Benjamin Petersone711caf2008-06-11 16:44:04 +00001082
1083class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001084 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001086 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001087 def set(self):
1088 return self._callmethod('set')
1089 def clear(self):
1090 return self._callmethod('clear')
1091 def wait(self, timeout=None):
1092 return self._callmethod('wait', (timeout,))
1093
Richard Oudkerk3730a172012-06-15 18:26:07 +01001094
1095class BarrierProxy(BaseProxy):
1096 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1097 def wait(self, timeout=None):
1098 return self._callmethod('wait', (timeout,))
1099 def abort(self):
1100 return self._callmethod('abort')
1101 def reset(self):
1102 return self._callmethod('reset')
1103 @property
1104 def parties(self):
1105 return self._callmethod('__getattribute__', ('parties',))
1106 @property
1107 def n_waiting(self):
1108 return self._callmethod('__getattribute__', ('n_waiting',))
1109 @property
1110 def broken(self):
1111 return self._callmethod('__getattribute__', ('broken',))
1112
1113
Benjamin Petersone711caf2008-06-11 16:44:04 +00001114class NamespaceProxy(BaseProxy):
1115 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1116 def __getattr__(self, key):
1117 if key[0] == '_':
1118 return object.__getattribute__(self, key)
1119 callmethod = object.__getattribute__(self, '_callmethod')
1120 return callmethod('__getattribute__', (key,))
1121 def __setattr__(self, key, value):
1122 if key[0] == '_':
1123 return object.__setattr__(self, key, value)
1124 callmethod = object.__getattribute__(self, '_callmethod')
1125 return callmethod('__setattr__', (key, value))
1126 def __delattr__(self, key):
1127 if key[0] == '_':
1128 return object.__delattr__(self, key)
1129 callmethod = object.__getattribute__(self, '_callmethod')
1130 return callmethod('__delattr__', (key,))
1131
1132
1133class ValueProxy(BaseProxy):
1134 _exposed_ = ('get', 'set')
1135 def get(self):
1136 return self._callmethod('get')
1137 def set(self, value):
1138 return self._callmethod('set', (value,))
1139 value = property(get, set)
1140
Batuhan Taşkaya03615562020-04-10 17:46:36 +03001141 __class_getitem__ = classmethod(types.GenericAlias)
1142
Benjamin Petersone711caf2008-06-11 16:44:04 +00001143
1144BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001145 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1146 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001147 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1148 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001149 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001150class ListProxy(BaseListProxy):
1151 def __iadd__(self, value):
1152 self._callmethod('extend', (value,))
1153 return self
1154 def __imul__(self, value):
1155 self._callmethod('__imul__', (value,))
1156 return self
1157
1158
1159DictProxy = MakeProxyType('DictProxy', (
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001160 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
Rémi Lapeyrea31f4cc2019-02-12 01:37:24 +01001161 '__setitem__', 'clear', 'copy', 'get', 'items',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001162 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1163 ))
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001164DictProxy._method_to_typeid_ = {
1165 '__iter__': 'Iterator',
1166 }
Benjamin Petersone711caf2008-06-11 16:44:04 +00001167
1168
1169ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001170 '__len__', '__getitem__', '__setitem__'
1171 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001172
1173
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001174BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001175 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001176 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001177 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001178BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001179 'apply_async': 'AsyncResult',
1180 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001181 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001182 'imap': 'Iterator',
1183 'imap_unordered': 'Iterator'
1184 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001185class PoolProxy(BasePoolProxy):
1186 def __enter__(self):
1187 return self
1188 def __exit__(self, exc_type, exc_val, exc_tb):
1189 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001190
1191#
1192# Definition of SyncManager
1193#
1194
1195class SyncManager(BaseManager):
1196 '''
1197 Subclass of `BaseManager` which supports a number of shared object types.
1198
1199 The types registered are those intended for the synchronization
1200 of threads, plus `dict`, `list` and `Namespace`.
1201
1202 The `multiprocessing.Manager()` function creates started instances of
1203 this class.
1204 '''
1205
1206SyncManager.register('Queue', queue.Queue)
1207SyncManager.register('JoinableQueue', queue.Queue)
1208SyncManager.register('Event', threading.Event, EventProxy)
1209SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1210SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1211SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1212SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1213 AcquirerProxy)
1214SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001215SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001216SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001217SyncManager.register('list', list, ListProxy)
1218SyncManager.register('dict', dict, DictProxy)
1219SyncManager.register('Value', Value, ValueProxy)
1220SyncManager.register('Array', Array, ArrayProxy)
1221SyncManager.register('Namespace', Namespace, NamespaceProxy)
1222
1223# types returned by methods of PoolProxy
1224SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1225SyncManager.register('AsyncResult', create_method=False)
Davin Pottse895de32019-02-23 22:08:16 -06001226
1227#
1228# Definition of SharedMemoryManager and SharedMemoryServer
1229#
1230
1231if HAS_SHMEM:
1232 class _SharedMemoryTracker:
1233 "Manages one or more shared memory segments."
1234
1235 def __init__(self, name, segment_names=[]):
1236 self.shared_memory_context_name = name
1237 self.segment_names = segment_names
1238
1239 def register_segment(self, segment_name):
1240 "Adds the supplied shared memory block name to tracker."
1241 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1242 self.segment_names.append(segment_name)
1243
1244 def destroy_segment(self, segment_name):
1245 """Calls unlink() on the shared memory block with the supplied name
1246 and removes it from the list of blocks being tracked."""
1247 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1248 self.segment_names.remove(segment_name)
1249 segment = shared_memory.SharedMemory(segment_name)
1250 segment.close()
1251 segment.unlink()
1252
1253 def unlink(self):
1254 "Calls destroy_segment() on all tracked shared memory blocks."
1255 for segment_name in self.segment_names[:]:
1256 self.destroy_segment(segment_name)
1257
1258 def __del__(self):
1259 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1260 self.unlink()
1261
1262 def __getstate__(self):
1263 return (self.shared_memory_context_name, self.segment_names)
1264
1265 def __setstate__(self, state):
1266 self.__init__(*state)
1267
1268
1269 class SharedMemoryServer(Server):
1270
1271 public = Server.public + \
1272 ['track_segment', 'release_segment', 'list_segments']
1273
1274 def __init__(self, *args, **kwargs):
1275 Server.__init__(self, *args, **kwargs)
Pablo Galindo6012f302020-03-09 13:48:01 +00001276 address = self.address
1277 # The address of Linux abstract namespaces can be bytes
1278 if isinstance(address, bytes):
1279 address = os.fsdecode(address)
Davin Pottse895de32019-02-23 22:08:16 -06001280 self.shared_memory_context = \
Pablo Galindo6012f302020-03-09 13:48:01 +00001281 _SharedMemoryTracker(f"shm_{address}_{getpid()}")
Davin Pottse895de32019-02-23 22:08:16 -06001282 util.debug(f"SharedMemoryServer started by pid {getpid()}")
1283
Serhiy Storchaka142566c2019-06-05 18:22:31 +03001284 def create(self, c, typeid, /, *args, **kwargs):
Davin Pottse895de32019-02-23 22:08:16 -06001285 """Create a new distributed-shared object (not backed by a shared
1286 memory block) and return its id to be used in a Proxy Object."""
1287 # Unless set up as a shared proxy, don't make shared_memory_context
1288 # a standard part of kwargs. This makes things easier for supplying
1289 # simple functions.
1290 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1291 kwargs['shared_memory_context'] = self.shared_memory_context
Serhiy Storchaka142566c2019-06-05 18:22:31 +03001292 return Server.create(self, c, typeid, *args, **kwargs)
Davin Pottse895de32019-02-23 22:08:16 -06001293
1294 def shutdown(self, c):
1295 "Call unlink() on all tracked shared memory, terminate the Server."
1296 self.shared_memory_context.unlink()
1297 return Server.shutdown(self, c)
1298
1299 def track_segment(self, c, segment_name):
1300 "Adds the supplied shared memory block name to Server's tracker."
1301 self.shared_memory_context.register_segment(segment_name)
1302
1303 def release_segment(self, c, segment_name):
1304 """Calls unlink() on the shared memory block with the supplied name
1305 and removes it from the tracker instance inside the Server."""
1306 self.shared_memory_context.destroy_segment(segment_name)
1307
1308 def list_segments(self, c):
1309 """Returns a list of names of shared memory blocks that the Server
1310 is currently tracking."""
1311 return self.shared_memory_context.segment_names
1312
1313
1314 class SharedMemoryManager(BaseManager):
1315 """Like SyncManager but uses SharedMemoryServer instead of Server.
1316
1317 It provides methods for creating and returning SharedMemory instances
1318 and for creating a list-like object (ShareableList) backed by shared
1319 memory. It also provides methods that create and return Proxy Objects
1320 that support synchronization across processes (i.e. multi-process-safe
1321 locks and semaphores).
1322 """
1323
1324 _Server = SharedMemoryServer
1325
1326 def __init__(self, *args, **kwargs):
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02001327 if os.name == "posix":
1328 # bpo-36867: Ensure the resource_tracker is running before
1329 # launching the manager process, so that concurrent
1330 # shared_memory manipulation both in the manager and in the
1331 # current process does not create two resource_tracker
1332 # processes.
1333 from . import resource_tracker
1334 resource_tracker.ensure_running()
Davin Pottse895de32019-02-23 22:08:16 -06001335 BaseManager.__init__(self, *args, **kwargs)
1336 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1337
1338 def __del__(self):
1339 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1340 pass
1341
1342 def get_server(self):
1343 'Better than monkeypatching for now; merge into Server ultimately'
1344 if self._state.value != State.INITIAL:
1345 if self._state.value == State.STARTED:
1346 raise ProcessError("Already started SharedMemoryServer")
1347 elif self._state.value == State.SHUTDOWN:
1348 raise ProcessError("SharedMemoryManager has shut down")
1349 else:
1350 raise ProcessError(
1351 "Unknown state {!r}".format(self._state.value))
1352 return self._Server(self._registry, self._address,
1353 self._authkey, self._serializer)
1354
1355 def SharedMemory(self, size):
1356 """Returns a new SharedMemory instance with the specified size in
1357 bytes, to be tracked by the manager."""
1358 with self._Client(self._address, authkey=self._authkey) as conn:
1359 sms = shared_memory.SharedMemory(None, create=True, size=size)
1360 try:
1361 dispatch(conn, None, 'track_segment', (sms.name,))
1362 except BaseException as e:
1363 sms.unlink()
1364 raise e
1365 return sms
1366
1367 def ShareableList(self, sequence):
1368 """Returns a new ShareableList instance populated with the values
1369 from the input sequence, to be tracked by the manager."""
1370 with self._Client(self._address, authkey=self._authkey) as conn:
1371 sl = shared_memory.ShareableList(sequence)
1372 try:
1373 dispatch(conn, None, 'track_segment', (sl.shm.name,))
1374 except BaseException as e:
1375 sl.shm.unlink()
1376 raise e
1377 return sl