blob: b6b4cdd9ac15eb4b5fc4d08a4979bb9ad7841f8c [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
Davin Pottse895de32019-02-23 22:08:16 -06002# Module providing manager classes for dealing
Benjamin Petersone711caf2008-06-11 16:44:04 +00003# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
Miss Islington (bot)8ece98a2021-08-09 10:39:05 -070011__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
Benjamin Petersone711caf2008-06-11 16:44:04 +000012
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
Pierre Glaserd0d64ad2019-05-10 20:42:35 +020019import signal
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import queue
Victor Stinnerc2368cb2018-07-06 13:51:52 +020022import time
Batuhan Taşkaya03615562020-04-10 17:46:36 +030023import types
Pierre Glaserb1dfcad2019-05-13 21:15:32 +020024import os
Davin Pottse895de32019-02-23 22:08:16 -060025from os import getpid
Benjamin Petersone711caf2008-06-11 16:44:04 +000026
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from traceback import format_exc
28
29from . import connection
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -050030from .context import reduction, get_spawning_popen, ProcessError
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010031from . import pool
32from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010033from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010034from . import get_context
Davin Pottse895de32019-02-23 22:08:16 -060035try:
36 from . import shared_memory
Davin Pottse895de32019-02-23 22:08:16 -060037except ImportError:
38 HAS_SHMEM = False
Miss Islington (bot)8ece98a2021-08-09 10:39:05 -070039else:
40 HAS_SHMEM = True
41 __all__.append('SharedMemoryManager')
Benjamin Petersone711caf2008-06-11 16:44:04 +000042
Benjamin Petersone711caf2008-06-11 16:44:04 +000043#
Benjamin Petersone711caf2008-06-11 16:44:04 +000044# Register some things for pickling
45#
46
47def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000048 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010049reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000050
51view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000052if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000053 def rebuild_as_list(obj):
54 return list, (list(obj),)
55 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010056 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000057
58#
59# Type for identifying shared objects
60#
61
62class Token(object):
63 '''
Galdenc6066242020-04-18 14:58:29 +080064 Type to uniquely identify a shared object
Benjamin Petersone711caf2008-06-11 16:44:04 +000065 '''
66 __slots__ = ('typeid', 'address', 'id')
67
68 def __init__(self, typeid, address, id):
69 (self.typeid, self.address, self.id) = (typeid, address, id)
70
71 def __getstate__(self):
72 return (self.typeid, self.address, self.id)
73
74 def __setstate__(self, state):
75 (self.typeid, self.address, self.id) = state
76
77 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030078 return '%s(typeid=%r, address=%r, id=%r)' % \
79 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000080
81#
82# Function for communication with a manager's server process
83#
84
85def dispatch(c, id, methodname, args=(), kwds={}):
86 '''
87 Send a message to manager using connection `c` and return response
88 '''
89 c.send((id, methodname, args, kwds))
90 kind, result = c.recv()
91 if kind == '#RETURN':
92 return result
93 raise convert_to_error(kind, result)
94
95def convert_to_error(kind, result):
96 if kind == '#ERROR':
97 return result
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050098 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
99 if not isinstance(result, str):
100 raise TypeError(
101 "Result {0!r} (kind '{1}') type is {2}, not str".format(
102 result, kind, type(result)))
103 if kind == '#UNSERIALIZABLE':
104 return RemoteError('Unserializable message: %s\n' % result)
105 else:
106 return RemoteError(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000107 else:
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -0500108 return ValueError('Unrecognized message type {!r}'.format(kind))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000109
110class RemoteError(Exception):
111 def __str__(self):
112 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
113
114#
115# Functions for finding the method names of an object
116#
117
118def all_methods(obj):
119 '''
120 Return a list of names of methods of `obj`
121 '''
122 temp = []
123 for name in dir(obj):
124 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200125 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000126 temp.append(name)
127 return temp
128
129def public_methods(obj):
130 '''
131 Return a list of names of methods of `obj` which do not start with '_'
132 '''
133 return [name for name in all_methods(obj) if name[0] != '_']
134
135#
136# Server which is run in a process controlled by a manager
137#
138
139class Server(object):
140 '''
141 Server class which runs in a process controlled by a manager object
142 '''
143 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
144 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
145
146 def __init__(self, registry, address, authkey, serializer):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500147 if not isinstance(authkey, bytes):
148 raise TypeError(
149 "Authkey {0!r} is type {1!s}, not bytes".format(
150 authkey, type(authkey)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100152 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153 Listener, Client = listener_client[serializer]
154
155 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100156 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000157 self.address = self.listener.address
158
Jesse Noller63b3a972009-01-21 02:15:48 +0000159 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500161 self.id_to_local_proxy_obj = {}
162 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163
164 def serve_forever(self):
165 '''
166 Run the server forever
167 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100168 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100169 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100171 accepter = threading.Thread(target=self.accepter)
172 accepter.daemon = True
173 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100175 while not self.stop_event.is_set():
176 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000177 except (KeyboardInterrupt, SystemExit):
178 pass
179 finally:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500180 if sys.stdout != sys.__stdout__: # what about stderr?
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100181 util.debug('resetting stdout, stderr')
182 sys.stdout = sys.__stdout__
183 sys.stderr = sys.__stderr__
184 sys.exit(0)
185
186 def accepter(self):
187 while True:
188 try:
189 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200190 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100191 continue
192 t = threading.Thread(target=self.handle_request, args=(c,))
193 t.daemon = True
194 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195
Victor Stinner7c29ae12021-04-16 19:42:34 +0200196 def _handle_request(self, c):
197 request = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198 try:
199 connection.deliver_challenge(c, self.authkey)
200 connection.answer_challenge(c, self.authkey)
201 request = c.recv()
202 ignore, funcname, args, kwds = request
203 assert funcname in self.public, '%r unrecognized' % funcname
204 func = getattr(self, funcname)
205 except Exception:
206 msg = ('#TRACEBACK', format_exc())
207 else:
208 try:
209 result = func(c, *args, **kwds)
210 except Exception:
211 msg = ('#TRACEBACK', format_exc())
212 else:
213 msg = ('#RETURN', result)
Victor Stinner7c29ae12021-04-16 19:42:34 +0200214
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215 try:
216 c.send(msg)
217 except Exception as e:
218 try:
219 c.send(('#TRACEBACK', format_exc()))
220 except Exception:
221 pass
222 util.info('Failure to send message: %r', msg)
223 util.info(' ... request was %r', request)
224 util.info(' ... exception was %r', e)
225
Victor Stinner7c29ae12021-04-16 19:42:34 +0200226 def handle_request(self, conn):
227 '''
228 Handle a new connection
229 '''
230 try:
231 self._handle_request(conn)
232 except SystemExit:
233 # Server.serve_client() calls sys.exit(0) on EOF
234 pass
235 finally:
236 conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237
238 def serve_client(self, conn):
239 '''
240 Handle requests from the proxies in a particular process/thread
241 '''
242 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000243 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
245 recv = conn.recv
246 send = conn.send
247 id_to_obj = self.id_to_obj
248
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100249 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250
251 try:
252 methodname = obj = None
253 request = recv()
254 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500255 try:
256 obj, exposed, gettypeid = id_to_obj[ident]
257 except KeyError as ke:
258 try:
259 obj, exposed, gettypeid = \
260 self.id_to_local_proxy_obj[ident]
Pablo Galindo293dd232019-11-19 21:34:03 +0000261 except KeyError:
Davin Potts86a76682016-09-07 18:48:01 -0500262 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263
264 if methodname not in exposed:
265 raise AttributeError(
266 'method %r of %r object is not in exposed=%r' %
267 (methodname, type(obj), exposed)
268 )
269
270 function = getattr(obj, methodname)
271
272 try:
273 res = function(*args, **kwds)
274 except Exception as e:
275 msg = ('#ERROR', e)
276 else:
277 typeid = gettypeid and gettypeid.get(methodname, None)
278 if typeid:
279 rident, rexposed = self.create(conn, typeid, res)
280 token = Token(typeid, self.address, rident)
281 msg = ('#PROXY', (rexposed, token))
282 else:
283 msg = ('#RETURN', res)
284
285 except AttributeError:
286 if methodname is None:
287 msg = ('#TRACEBACK', format_exc())
288 else:
289 try:
290 fallback_func = self.fallback_mapping[methodname]
291 result = fallback_func(
292 self, conn, ident, obj, *args, **kwds
293 )
294 msg = ('#RETURN', result)
295 except Exception:
296 msg = ('#TRACEBACK', format_exc())
297
298 except EOFError:
299 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000300 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 sys.exit(0)
302
303 except Exception:
304 msg = ('#TRACEBACK', format_exc())
305
306 try:
307 try:
308 send(msg)
Pablo Galindo293dd232019-11-19 21:34:03 +0000309 except Exception:
Davin Potts37156a72016-09-08 14:40:36 -0500310 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000311 except Exception as e:
312 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000313 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000314 util.info(' ... message was %r', msg)
315 util.info(' ... exception was %r', e)
316 conn.close()
317 sys.exit(1)
318
319 def fallback_getvalue(self, conn, ident, obj):
320 return obj
321
322 def fallback_str(self, conn, ident, obj):
323 return str(obj)
324
325 def fallback_repr(self, conn, ident, obj):
326 return repr(obj)
327
328 fallback_mapping = {
329 '__str__':fallback_str,
330 '__repr__':fallback_repr,
331 '#GETVALUE':fallback_getvalue
332 }
333
334 def dummy(self, c):
335 pass
336
337 def debug_info(self, c):
338 '''
339 Return some info --- useful to spot problems with refcounting
340 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500341 # Perhaps include debug info about 'c'?
Charles-François Natalia924fc72014-05-25 14:12:12 +0100342 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500344 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000345 keys.sort()
346 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000347 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000348 result.append(' %s: refcount=%s\n %s' %
349 (ident, self.id_to_refcount[ident],
350 str(self.id_to_obj[ident][0])[:75]))
351 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000352
353 def number_of_objects(self, c):
354 '''
355 Number of shared objects
356 '''
Davin Potts86a76682016-09-07 18:48:01 -0500357 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
358 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000359
360 def shutdown(self, c):
361 '''
362 Shutdown this process
363 '''
364 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100365 util.debug('manager received shutdown message')
366 c.send(('#RETURN', None))
367 except:
368 import traceback
369 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000370 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100371 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000372
Serhiy Storchaka142566c2019-06-05 18:22:31 +0300373 def create(self, c, typeid, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000374 '''
375 Create a new shared object and return its id
376 '''
Charles-François Natalia924fc72014-05-25 14:12:12 +0100377 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000378 callable, exposed, method_to_typeid, proxytype = \
379 self.registry[typeid]
380
381 if callable is None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500382 if kwds or (len(args) != 1):
383 raise ValueError(
384 "Without callable, must have one non-keyword argument")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000385 obj = args[0]
386 else:
387 obj = callable(*args, **kwds)
388
389 if exposed is None:
390 exposed = public_methods(obj)
391 if method_to_typeid is not None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500392 if not isinstance(method_to_typeid, dict):
393 raise TypeError(
394 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
395 method_to_typeid, type(method_to_typeid)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000396 exposed = list(exposed) + list(method_to_typeid)
397
398 ident = '%x' % id(obj) # convert to string because xmlrpclib
399 # only has 32 bit signed integers
400 util.debug('%r callable returned object with id %r', typeid, ident)
401
402 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
403 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000404 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500405
406 self.incref(c, ident)
407 return ident, tuple(exposed)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000408
409 def get_methods(self, c, token):
410 '''
411 Return the methods of the shared object indicated by token
412 '''
413 return tuple(self.id_to_obj[token.id][1])
414
415 def accept_connection(self, c, name):
416 '''
417 Spawn a new thread to serve this connection
418 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000419 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000420 c.send(('#RETURN', None))
421 self.serve_client(c)
422
423 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100424 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500425 try:
426 self.id_to_refcount[ident] += 1
427 except KeyError as ke:
428 # If no external references exist but an internal (to the
429 # manager) still does and a new external reference is created
430 # from it, restore the manager's tracking of it from the
431 # previously stashed internal ref.
432 if ident in self.id_to_local_proxy_obj:
433 self.id_to_refcount[ident] = 1
434 self.id_to_obj[ident] = \
435 self.id_to_local_proxy_obj[ident]
436 obj, exposed, gettypeid = self.id_to_obj[ident]
437 util.debug('Server re-enabled tracking & INCREF %r', ident)
438 else:
439 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000440
441 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500442 if ident not in self.id_to_refcount and \
443 ident in self.id_to_local_proxy_obj:
444 util.debug('Server DECREF skipping %r', ident)
445 return
446
Charles-François Natalia924fc72014-05-25 14:12:12 +0100447 with self.mutex:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500448 if self.id_to_refcount[ident] <= 0:
449 raise AssertionError(
450 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
451 ident, self.id_to_obj[ident],
452 self.id_to_refcount[ident]))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 self.id_to_refcount[ident] -= 1
454 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500455 del self.id_to_refcount[ident]
456
457 if ident not in self.id_to_refcount:
458 # Two-step process in case the object turns out to contain other
459 # proxy objects (e.g. a managed list of managed lists).
460 # Otherwise, deleting self.id_to_obj[ident] would trigger the
461 # deleting of the stored value (another managed object) which would
462 # in turn attempt to acquire the mutex that is already held here.
463 self.id_to_obj[ident] = (None, (), None) # thread-safe
464 util.debug('disposing of obj with id %r', ident)
465 with self.mutex:
466 del self.id_to_obj[ident]
467
Benjamin Petersone711caf2008-06-11 16:44:04 +0000468
469#
470# Class to represent state of a manager
471#
472
473class State(object):
474 __slots__ = ['value']
475 INITIAL = 0
476 STARTED = 1
477 SHUTDOWN = 2
478
479#
480# Mapping from serializer name to Listener and Client types
481#
482
483listener_client = {
484 'pickle' : (connection.Listener, connection.Client),
485 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
486 }
487
488#
489# Definition of BaseManager
490#
491
492class BaseManager(object):
493 '''
494 Base class for managers
495 '''
496 _registry = {}
497 _Server = Server
498
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100499 def __init__(self, address=None, authkey=None, serializer='pickle',
500 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000501 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100502 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000503 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100504 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000505 self._state = State()
506 self._state.value = State.INITIAL
507 self._serializer = serializer
508 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100509 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510
Benjamin Petersone711caf2008-06-11 16:44:04 +0000511 def get_server(self):
512 '''
513 Return server object with serve_forever() method and address attribute
514 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500515 if self._state.value != State.INITIAL:
516 if self._state.value == State.STARTED:
517 raise ProcessError("Already started server")
518 elif self._state.value == State.SHUTDOWN:
519 raise ProcessError("Manager has shut down")
520 else:
521 raise ProcessError(
522 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000523 return Server(self._registry, self._address,
524 self._authkey, self._serializer)
525
526 def connect(self):
527 '''
528 Connect manager object to the server process
529 '''
530 Listener, Client = listener_client[self._serializer]
531 conn = Client(self._address, authkey=self._authkey)
532 dispatch(conn, None, 'dummy')
533 self._state.value = State.STARTED
534
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000535 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000536 '''
537 Spawn a server process for this manager object
538 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500539 if self._state.value != State.INITIAL:
540 if self._state.value == State.STARTED:
541 raise ProcessError("Already started server")
542 elif self._state.value == State.SHUTDOWN:
543 raise ProcessError("Manager has shut down")
544 else:
545 raise ProcessError(
546 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000547
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200548 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000549 raise TypeError('initializer must be a callable')
550
Benjamin Petersone711caf2008-06-11 16:44:04 +0000551 # pipe over which we will retrieve address of server
552 reader, writer = connection.Pipe(duplex=False)
553
554 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100555 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000556 target=type(self)._run_server,
557 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000558 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000559 )
560 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000561 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562 self._process.start()
563
564 # get address of server
565 writer.close()
566 self._address = reader.recv()
567 reader.close()
568
569 # register a finalizer
570 self._state.value = State.STARTED
571 self.shutdown = util.Finalize(
572 self, type(self)._finalize_manager,
573 args=(self._process, self._address, self._authkey,
574 self._state, self._Client),
575 exitpriority=0
576 )
577
578 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000579 def _run_server(cls, registry, address, authkey, serializer, writer,
580 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000581 '''
582 Create a server, report its address and run it
583 '''
Pierre Glaserd0d64ad2019-05-10 20:42:35 +0200584 # bpo-36368: protect server process from KeyboardInterrupt signals
585 signal.signal(signal.SIGINT, signal.SIG_IGN)
586
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000587 if initializer is not None:
588 initializer(*initargs)
589
Benjamin Petersone711caf2008-06-11 16:44:04 +0000590 # create server
591 server = cls._Server(registry, address, authkey, serializer)
592
593 # inform parent process of the server's address
594 writer.send(server.address)
595 writer.close()
596
597 # run the manager
598 util.info('manager serving at %r', server.address)
599 server.serve_forever()
600
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300601 def _create(self, typeid, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000602 '''
603 Create a new shared object; return the token and exposed tuple
604 '''
605 assert self._state.value == State.STARTED, 'server not yet started'
606 conn = self._Client(self._address, authkey=self._authkey)
607 try:
608 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
609 finally:
610 conn.close()
611 return Token(typeid, self._address, id), exposed
612
613 def join(self, timeout=None):
614 '''
615 Join the manager process (if it has been spawned)
616 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100617 if self._process is not None:
618 self._process.join(timeout)
619 if not self._process.is_alive():
620 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000621
622 def _debug_info(self):
623 '''
624 Return some info about the servers shared objects and connections
625 '''
626 conn = self._Client(self._address, authkey=self._authkey)
627 try:
628 return dispatch(conn, None, 'debug_info')
629 finally:
630 conn.close()
631
632 def _number_of_objects(self):
633 '''
634 Return the number of shared objects
635 '''
636 conn = self._Client(self._address, authkey=self._authkey)
637 try:
638 return dispatch(conn, None, 'number_of_objects')
639 finally:
640 conn.close()
641
642 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100643 if self._state.value == State.INITIAL:
644 self.start()
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500645 if self._state.value != State.STARTED:
646 if self._state.value == State.INITIAL:
647 raise ProcessError("Unable to start server")
648 elif self._state.value == State.SHUTDOWN:
649 raise ProcessError("Manager has shut down")
650 else:
651 raise ProcessError(
652 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000653 return self
654
655 def __exit__(self, exc_type, exc_val, exc_tb):
656 self.shutdown()
657
658 @staticmethod
659 def _finalize_manager(process, address, authkey, state, _Client):
660 '''
661 Shutdown the manager process; will be registered as a finalizer
662 '''
663 if process.is_alive():
664 util.info('sending shutdown message to manager')
665 try:
666 conn = _Client(address, authkey=authkey)
667 try:
668 dispatch(conn, None, 'shutdown')
669 finally:
670 conn.close()
671 except Exception:
672 pass
673
Richard Oudkerk3049f122012-06-15 20:08:29 +0100674 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000675 if process.is_alive():
676 util.info('manager still alive')
677 if hasattr(process, 'terminate'):
678 util.info('trying to `terminate()` manager process')
679 process.terminate()
680 process.join(timeout=0.1)
681 if process.is_alive():
682 util.info('manager still alive after terminate')
683
684 state.value = State.SHUTDOWN
685 try:
686 del BaseProxy._address_to_local[address]
687 except KeyError:
688 pass
689
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200690 @property
691 def address(self):
692 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000693
694 @classmethod
695 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
696 method_to_typeid=None, create_method=True):
697 '''
698 Register a typeid with the manager type
699 '''
700 if '_registry' not in cls.__dict__:
701 cls._registry = cls._registry.copy()
702
703 if proxytype is None:
704 proxytype = AutoProxy
705
706 exposed = exposed or getattr(proxytype, '_exposed_', None)
707
708 method_to_typeid = method_to_typeid or \
709 getattr(proxytype, '_method_to_typeid_', None)
710
711 if method_to_typeid:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500712 for key, value in list(method_to_typeid.items()): # isinstance?
Benjamin Petersone711caf2008-06-11 16:44:04 +0000713 assert type(key) is str, '%r is not a string' % key
714 assert type(value) is str, '%r is not a string' % value
715
716 cls._registry[typeid] = (
717 callable, exposed, method_to_typeid, proxytype
718 )
719
720 if create_method:
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300721 def temp(self, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000722 util.debug('requesting creation of a shared %r object', typeid)
723 token, exp = self._create(typeid, *args, **kwds)
724 proxy = proxytype(
725 token, self._serializer, manager=self,
726 authkey=self._authkey, exposed=exp
727 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000728 conn = self._Client(token.address, authkey=self._authkey)
729 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000730 return proxy
731 temp.__name__ = typeid
732 setattr(cls, typeid, temp)
733
734#
735# Subclass of set which get cleared after a fork
736#
737
738class ProcessLocalSet(set):
739 def __init__(self):
740 util.register_after_fork(self, lambda obj: obj.clear())
741 def __reduce__(self):
742 return type(self), ()
743
744#
745# Definition of BaseProxy
746#
747
748class BaseProxy(object):
749 '''
750 A base for proxies of shared objects
751 '''
752 _address_to_local = {}
753 _mutex = util.ForkAwareThreadLock()
754
755 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500756 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100757 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000758 tls_idset = BaseProxy._address_to_local.get(token.address, None)
759 if tls_idset is None:
760 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
761 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000762
763 # self._tls is used to record the connection used by this
764 # thread to communicate with the manager at token.address
765 self._tls = tls_idset[0]
766
767 # self._idset is used to record the identities of all shared
768 # objects for which the current process owns references and
769 # which are in the manager at token.address
770 self._idset = tls_idset[1]
771
772 self._token = token
773 self._id = self._token.id
774 self._manager = manager
775 self._serializer = serializer
776 self._Client = listener_client[serializer][1]
777
Davin Potts86a76682016-09-07 18:48:01 -0500778 # Should be set to True only when a proxy object is being created
779 # on the manager server; primary use case: nested proxy objects.
780 # RebuildProxy detects when a proxy is being created on the manager
781 # and sets this value appropriately.
782 self._owned_by_manager = manager_owned
783
Benjamin Petersone711caf2008-06-11 16:44:04 +0000784 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100785 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 elif self._manager is not None:
787 self._authkey = self._manager._authkey
788 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100789 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000790
791 if incref:
792 self._incref()
793
794 util.register_after_fork(self, BaseProxy._after_fork)
795
796 def _connect(self):
797 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100798 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000799 if threading.current_thread().name != 'MainThread':
800 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000801 conn = self._Client(self._token.address, authkey=self._authkey)
802 dispatch(conn, None, 'accept_connection', (name,))
803 self._tls.connection = conn
804
805 def _callmethod(self, methodname, args=(), kwds={}):
806 '''
Galdenc6066242020-04-18 14:58:29 +0800807 Try to call a method of the referent and return a copy of the result
Benjamin Petersone711caf2008-06-11 16:44:04 +0000808 '''
809 try:
810 conn = self._tls.connection
811 except AttributeError:
812 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000813 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000814 self._connect()
815 conn = self._tls.connection
816
817 conn.send((self._id, methodname, args, kwds))
818 kind, result = conn.recv()
819
820 if kind == '#RETURN':
821 return result
822 elif kind == '#PROXY':
823 exposed, token = result
824 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100825 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000826 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000827 token, self._serializer, manager=self._manager,
828 authkey=self._authkey, exposed=exposed
829 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000830 conn = self._Client(token.address, authkey=self._authkey)
831 dispatch(conn, None, 'decref', (token.id,))
832 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000833 raise convert_to_error(kind, result)
834
835 def _getvalue(self):
836 '''
837 Get a copy of the value of the referent
838 '''
839 return self._callmethod('#GETVALUE')
840
841 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500842 if self._owned_by_manager:
843 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
844 return
845
Benjamin Petersone711caf2008-06-11 16:44:04 +0000846 conn = self._Client(self._token.address, authkey=self._authkey)
847 dispatch(conn, None, 'incref', (self._id,))
848 util.debug('INCREF %r', self._token.id)
849
850 self._idset.add(self._id)
851
852 state = self._manager and self._manager._state
853
854 self._close = util.Finalize(
855 self, BaseProxy._decref,
856 args=(self._token, self._authkey, state,
857 self._tls, self._idset, self._Client),
858 exitpriority=10
859 )
860
861 @staticmethod
862 def _decref(token, authkey, state, tls, idset, _Client):
863 idset.discard(token.id)
864
865 # check whether manager is still alive
866 if state is None or state.value == State.STARTED:
867 # tell manager this process no longer cares about referent
868 try:
869 util.debug('DECREF %r', token.id)
870 conn = _Client(token.address, authkey=authkey)
871 dispatch(conn, None, 'decref', (token.id,))
872 except Exception as e:
873 util.debug('... decref failed %s', e)
874
875 else:
876 util.debug('DECREF %r -- manager already shutdown', token.id)
877
878 # check whether we can close this thread's connection because
879 # the process owns no more references to objects for this manager
880 if not idset and hasattr(tls, 'connection'):
881 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000882 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883 tls.connection.close()
884 del tls.connection
885
886 def _after_fork(self):
887 self._manager = None
888 try:
889 self._incref()
890 except Exception as e:
891 # the proxy may just be for a manager which has shutdown
892 util.info('incref failed: %s' % e)
893
894 def __reduce__(self):
895 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500896 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 kwds['authkey'] = self._authkey
898
899 if getattr(self, '_isauto', False):
900 kwds['exposed'] = self._exposed_
901 return (RebuildProxy,
902 (AutoProxy, self._token, self._serializer, kwds))
903 else:
904 return (RebuildProxy,
905 (type(self), self._token, self._serializer, kwds))
906
907 def __deepcopy__(self, memo):
908 return self._getvalue()
909
910 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300911 return '<%s object, typeid %r at %#x>' % \
912 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000913
914 def __str__(self):
915 '''
916 Return representation of the referent (or a fall-back if that fails)
917 '''
918 try:
919 return self._callmethod('__repr__')
920 except Exception:
921 return repr(self)[:-1] + "; '__str__()' failed>"
922
923#
924# Function used for unpickling
925#
926
927def RebuildProxy(func, token, serializer, kwds):
928 '''
929 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000930 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100931 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000932 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500933 util.debug('Rebuild a proxy owned by manager, token=%r', token)
934 kwds['manager_owned'] = True
935 if token.id not in server.id_to_local_proxy_obj:
936 server.id_to_local_proxy_obj[token.id] = \
937 server.id_to_obj[token.id]
938 incref = (
939 kwds.pop('incref', True) and
940 not getattr(process.current_process(), '_inheriting', False)
941 )
942 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000943
944#
945# Functions to create proxies and proxy types
946#
947
948def MakeProxyType(name, exposed, _cache={}):
949 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300950 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000951 '''
952 exposed = tuple(exposed)
953 try:
954 return _cache[(name, exposed)]
955 except KeyError:
956 pass
957
958 dic = {}
959
960 for meth in exposed:
Serhiy Storchaka2085bd02019-06-01 11:00:15 +0300961 exec('''def %s(self, /, *args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000962 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
963
964 ProxyType = type(name, (BaseProxy,), dic)
965 ProxyType._exposed_ = exposed
966 _cache[(name, exposed)] = ProxyType
967 return ProxyType
968
969
970def AutoProxy(token, serializer, manager=None, authkey=None,
Miss Islington (bot)3ec3e0f2021-07-01 21:15:47 -0700971 exposed=None, incref=True, manager_owned=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000972 '''
973 Return an auto-proxy for `token`
974 '''
975 _Client = listener_client[serializer][1]
976
977 if exposed is None:
978 conn = _Client(token.address, authkey=authkey)
979 try:
980 exposed = dispatch(conn, None, 'get_methods', (token,))
981 finally:
982 conn.close()
983
984 if authkey is None and manager is not None:
985 authkey = manager._authkey
986 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100987 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000988
989 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
990 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
Miss Islington (bot)3ec3e0f2021-07-01 21:15:47 -0700991 incref=incref, manager_owned=manager_owned)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000992 proxy._isauto = True
993 return proxy
994
995#
996# Types/callables which we will register with SyncManager
997#
998
999class Namespace(object):
Serhiy Storchaka2085bd02019-06-01 11:00:15 +03001000 def __init__(self, /, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001001 self.__dict__.update(kwds)
1002 def __repr__(self):
1003 items = list(self.__dict__.items())
1004 temp = []
1005 for name, value in items:
1006 if not name.startswith('_'):
1007 temp.append('%s=%r' % (name, value))
1008 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +03001009 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001010
1011class Value(object):
1012 def __init__(self, typecode, value, lock=True):
1013 self._typecode = typecode
1014 self._value = value
1015 def get(self):
1016 return self._value
1017 def set(self, value):
1018 self._value = value
1019 def __repr__(self):
1020 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1021 value = property(get, set)
1022
1023def Array(typecode, sequence, lock=True):
1024 return array.array(typecode, sequence)
1025
1026#
1027# Proxy types used by SyncManager
1028#
1029
1030class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +00001031 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001032 def __iter__(self):
1033 return self
1034 def __next__(self, *args):
1035 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001036 def send(self, *args):
1037 return self._callmethod('send', args)
1038 def throw(self, *args):
1039 return self._callmethod('throw', args)
1040 def close(self, *args):
1041 return self._callmethod('close', args)
1042
1043
1044class AcquirerProxy(BaseProxy):
1045 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +01001046 def acquire(self, blocking=True, timeout=None):
1047 args = (blocking,) if timeout is None else (blocking, timeout)
1048 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001049 def release(self):
1050 return self._callmethod('release')
1051 def __enter__(self):
1052 return self._callmethod('acquire')
1053 def __exit__(self, exc_type, exc_val, exc_tb):
1054 return self._callmethod('release')
1055
1056
1057class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001058 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001059 def wait(self, timeout=None):
1060 return self._callmethod('wait', (timeout,))
Antoine Pitrou48350412017-07-04 08:59:22 +02001061 def notify(self, n=1):
1062 return self._callmethod('notify', (n,))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001063 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001064 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001065 def wait_for(self, predicate, timeout=None):
1066 result = predicate()
1067 if result:
1068 return result
1069 if timeout is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001070 endtime = time.monotonic() + timeout
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001071 else:
1072 endtime = None
1073 waittime = None
1074 while not result:
1075 if endtime is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001076 waittime = endtime - time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001077 if waittime <= 0:
1078 break
1079 self.wait(waittime)
1080 result = predicate()
1081 return result
1082
Benjamin Petersone711caf2008-06-11 16:44:04 +00001083
1084class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001085 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001086 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001087 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088 def set(self):
1089 return self._callmethod('set')
1090 def clear(self):
1091 return self._callmethod('clear')
1092 def wait(self, timeout=None):
1093 return self._callmethod('wait', (timeout,))
1094
Richard Oudkerk3730a172012-06-15 18:26:07 +01001095
1096class BarrierProxy(BaseProxy):
1097 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1098 def wait(self, timeout=None):
1099 return self._callmethod('wait', (timeout,))
1100 def abort(self):
1101 return self._callmethod('abort')
1102 def reset(self):
1103 return self._callmethod('reset')
1104 @property
1105 def parties(self):
1106 return self._callmethod('__getattribute__', ('parties',))
1107 @property
1108 def n_waiting(self):
1109 return self._callmethod('__getattribute__', ('n_waiting',))
1110 @property
1111 def broken(self):
1112 return self._callmethod('__getattribute__', ('broken',))
1113
1114
Benjamin Petersone711caf2008-06-11 16:44:04 +00001115class NamespaceProxy(BaseProxy):
1116 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1117 def __getattr__(self, key):
1118 if key[0] == '_':
1119 return object.__getattribute__(self, key)
1120 callmethod = object.__getattribute__(self, '_callmethod')
1121 return callmethod('__getattribute__', (key,))
1122 def __setattr__(self, key, value):
1123 if key[0] == '_':
1124 return object.__setattr__(self, key, value)
1125 callmethod = object.__getattribute__(self, '_callmethod')
1126 return callmethod('__setattr__', (key, value))
1127 def __delattr__(self, key):
1128 if key[0] == '_':
1129 return object.__delattr__(self, key)
1130 callmethod = object.__getattribute__(self, '_callmethod')
1131 return callmethod('__delattr__', (key,))
1132
1133
1134class ValueProxy(BaseProxy):
1135 _exposed_ = ('get', 'set')
1136 def get(self):
1137 return self._callmethod('get')
1138 def set(self, value):
1139 return self._callmethod('set', (value,))
1140 value = property(get, set)
1141
Batuhan Taşkaya03615562020-04-10 17:46:36 +03001142 __class_getitem__ = classmethod(types.GenericAlias)
1143
Benjamin Petersone711caf2008-06-11 16:44:04 +00001144
1145BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001146 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1147 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001148 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1149 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001150 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001151class ListProxy(BaseListProxy):
1152 def __iadd__(self, value):
1153 self._callmethod('extend', (value,))
1154 return self
1155 def __imul__(self, value):
1156 self._callmethod('__imul__', (value,))
1157 return self
1158
1159
1160DictProxy = MakeProxyType('DictProxy', (
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001161 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
Rémi Lapeyrea31f4cc2019-02-12 01:37:24 +01001162 '__setitem__', 'clear', 'copy', 'get', 'items',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001163 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1164 ))
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001165DictProxy._method_to_typeid_ = {
1166 '__iter__': 'Iterator',
1167 }
Benjamin Petersone711caf2008-06-11 16:44:04 +00001168
1169
1170ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001171 '__len__', '__getitem__', '__setitem__'
1172 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001173
1174
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001175BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001176 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001177 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001178 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001179BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001180 'apply_async': 'AsyncResult',
1181 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001182 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001183 'imap': 'Iterator',
1184 'imap_unordered': 'Iterator'
1185 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001186class PoolProxy(BasePoolProxy):
1187 def __enter__(self):
1188 return self
1189 def __exit__(self, exc_type, exc_val, exc_tb):
1190 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001191
1192#
1193# Definition of SyncManager
1194#
1195
1196class SyncManager(BaseManager):
1197 '''
1198 Subclass of `BaseManager` which supports a number of shared object types.
1199
1200 The types registered are those intended for the synchronization
1201 of threads, plus `dict`, `list` and `Namespace`.
1202
1203 The `multiprocessing.Manager()` function creates started instances of
1204 this class.
1205 '''
1206
1207SyncManager.register('Queue', queue.Queue)
1208SyncManager.register('JoinableQueue', queue.Queue)
1209SyncManager.register('Event', threading.Event, EventProxy)
1210SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1211SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1212SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1213SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1214 AcquirerProxy)
1215SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001216SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001217SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001218SyncManager.register('list', list, ListProxy)
1219SyncManager.register('dict', dict, DictProxy)
1220SyncManager.register('Value', Value, ValueProxy)
1221SyncManager.register('Array', Array, ArrayProxy)
1222SyncManager.register('Namespace', Namespace, NamespaceProxy)
1223
1224# types returned by methods of PoolProxy
1225SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1226SyncManager.register('AsyncResult', create_method=False)
Davin Pottse895de32019-02-23 22:08:16 -06001227
1228#
1229# Definition of SharedMemoryManager and SharedMemoryServer
1230#
1231
1232if HAS_SHMEM:
1233 class _SharedMemoryTracker:
1234 "Manages one or more shared memory segments."
1235
1236 def __init__(self, name, segment_names=[]):
1237 self.shared_memory_context_name = name
1238 self.segment_names = segment_names
1239
1240 def register_segment(self, segment_name):
1241 "Adds the supplied shared memory block name to tracker."
1242 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1243 self.segment_names.append(segment_name)
1244
1245 def destroy_segment(self, segment_name):
1246 """Calls unlink() on the shared memory block with the supplied name
1247 and removes it from the list of blocks being tracked."""
1248 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1249 self.segment_names.remove(segment_name)
1250 segment = shared_memory.SharedMemory(segment_name)
1251 segment.close()
1252 segment.unlink()
1253
1254 def unlink(self):
1255 "Calls destroy_segment() on all tracked shared memory blocks."
1256 for segment_name in self.segment_names[:]:
1257 self.destroy_segment(segment_name)
1258
1259 def __del__(self):
1260 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1261 self.unlink()
1262
1263 def __getstate__(self):
1264 return (self.shared_memory_context_name, self.segment_names)
1265
1266 def __setstate__(self, state):
1267 self.__init__(*state)
1268
1269
1270 class SharedMemoryServer(Server):
1271
1272 public = Server.public + \
1273 ['track_segment', 'release_segment', 'list_segments']
1274
1275 def __init__(self, *args, **kwargs):
1276 Server.__init__(self, *args, **kwargs)
Pablo Galindo6012f302020-03-09 13:48:01 +00001277 address = self.address
1278 # The address of Linux abstract namespaces can be bytes
1279 if isinstance(address, bytes):
1280 address = os.fsdecode(address)
Davin Pottse895de32019-02-23 22:08:16 -06001281 self.shared_memory_context = \
Pablo Galindo6012f302020-03-09 13:48:01 +00001282 _SharedMemoryTracker(f"shm_{address}_{getpid()}")
Davin Pottse895de32019-02-23 22:08:16 -06001283 util.debug(f"SharedMemoryServer started by pid {getpid()}")
1284
Serhiy Storchaka142566c2019-06-05 18:22:31 +03001285 def create(self, c, typeid, /, *args, **kwargs):
Davin Pottse895de32019-02-23 22:08:16 -06001286 """Create a new distributed-shared object (not backed by a shared
1287 memory block) and return its id to be used in a Proxy Object."""
1288 # Unless set up as a shared proxy, don't make shared_memory_context
1289 # a standard part of kwargs. This makes things easier for supplying
1290 # simple functions.
1291 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1292 kwargs['shared_memory_context'] = self.shared_memory_context
Serhiy Storchaka142566c2019-06-05 18:22:31 +03001293 return Server.create(self, c, typeid, *args, **kwargs)
Davin Pottse895de32019-02-23 22:08:16 -06001294
1295 def shutdown(self, c):
1296 "Call unlink() on all tracked shared memory, terminate the Server."
1297 self.shared_memory_context.unlink()
1298 return Server.shutdown(self, c)
1299
1300 def track_segment(self, c, segment_name):
1301 "Adds the supplied shared memory block name to Server's tracker."
1302 self.shared_memory_context.register_segment(segment_name)
1303
1304 def release_segment(self, c, segment_name):
1305 """Calls unlink() on the shared memory block with the supplied name
1306 and removes it from the tracker instance inside the Server."""
1307 self.shared_memory_context.destroy_segment(segment_name)
1308
1309 def list_segments(self, c):
1310 """Returns a list of names of shared memory blocks that the Server
1311 is currently tracking."""
1312 return self.shared_memory_context.segment_names
1313
1314
1315 class SharedMemoryManager(BaseManager):
1316 """Like SyncManager but uses SharedMemoryServer instead of Server.
1317
1318 It provides methods for creating and returning SharedMemory instances
1319 and for creating a list-like object (ShareableList) backed by shared
1320 memory. It also provides methods that create and return Proxy Objects
1321 that support synchronization across processes (i.e. multi-process-safe
1322 locks and semaphores).
1323 """
1324
1325 _Server = SharedMemoryServer
1326
1327 def __init__(self, *args, **kwargs):
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02001328 if os.name == "posix":
1329 # bpo-36867: Ensure the resource_tracker is running before
1330 # launching the manager process, so that concurrent
1331 # shared_memory manipulation both in the manager and in the
1332 # current process does not create two resource_tracker
1333 # processes.
1334 from . import resource_tracker
1335 resource_tracker.ensure_running()
Davin Pottse895de32019-02-23 22:08:16 -06001336 BaseManager.__init__(self, *args, **kwargs)
1337 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1338
1339 def __del__(self):
1340 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1341 pass
1342
1343 def get_server(self):
1344 'Better than monkeypatching for now; merge into Server ultimately'
1345 if self._state.value != State.INITIAL:
1346 if self._state.value == State.STARTED:
1347 raise ProcessError("Already started SharedMemoryServer")
1348 elif self._state.value == State.SHUTDOWN:
1349 raise ProcessError("SharedMemoryManager has shut down")
1350 else:
1351 raise ProcessError(
1352 "Unknown state {!r}".format(self._state.value))
1353 return self._Server(self._registry, self._address,
1354 self._authkey, self._serializer)
1355
1356 def SharedMemory(self, size):
1357 """Returns a new SharedMemory instance with the specified size in
1358 bytes, to be tracked by the manager."""
1359 with self._Client(self._address, authkey=self._authkey) as conn:
1360 sms = shared_memory.SharedMemory(None, create=True, size=size)
1361 try:
1362 dispatch(conn, None, 'track_segment', (sms.name,))
1363 except BaseException as e:
1364 sms.unlink()
1365 raise e
1366 return sms
1367
1368 def ShareableList(self, sequence):
1369 """Returns a new ShareableList instance populated with the values
1370 from the input sequence, to be tracked by the manager."""
1371 with self._Client(self._address, authkey=self._authkey) as conn:
1372 sl = shared_memory.ShareableList(sequence)
1373 try:
1374 dispatch(conn, None, 'track_segment', (sl.shm.name,))
1375 except BaseException as e:
1376 sl.shm.unlink()
1377 raise e
1378 return sl