blob: 514152298b097ccd26769a963d9d9a90630e85a8 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
Davin Pottse895de32019-02-23 22:08:16 -06002# Module providing manager classes for dealing
Benjamin Petersone711caf2008-06-11 16:44:04 +00003# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
Davin Pottse895de32019-02-23 22:08:16 -060011__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
12 'SharedMemoryManager' ]
Benjamin Petersone711caf2008-06-11 16:44:04 +000013
14#
15# Imports
16#
17
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000019import threading
Pierre Glaserd0d64ad2019-05-10 20:42:35 +020020import signal
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000022import queue
Victor Stinnerc2368cb2018-07-06 13:51:52 +020023import time
Pierre Glaserb1dfcad2019-05-13 21:15:32 +020024import os
Davin Pottse895de32019-02-23 22:08:16 -060025from os import getpid
Benjamin Petersone711caf2008-06-11 16:44:04 +000026
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from traceback import format_exc
28
29from . import connection
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -050030from .context import reduction, get_spawning_popen, ProcessError
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010031from . import pool
32from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010033from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010034from . import get_context
Davin Pottse895de32019-02-23 22:08:16 -060035try:
36 from . import shared_memory
37 HAS_SHMEM = True
38except ImportError:
39 HAS_SHMEM = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000040
Benjamin Petersone711caf2008-06-11 16:44:04 +000041#
Benjamin Petersone711caf2008-06-11 16:44:04 +000042# Register some things for pickling
43#
44
45def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000046 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010047reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
49view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000050if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000051 def rebuild_as_list(obj):
52 return list, (list(obj),)
53 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010054 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000055
56#
57# Type for identifying shared objects
58#
59
60class Token(object):
61 '''
62 Type to uniquely indentify a shared object
63 '''
64 __slots__ = ('typeid', 'address', 'id')
65
66 def __init__(self, typeid, address, id):
67 (self.typeid, self.address, self.id) = (typeid, address, id)
68
69 def __getstate__(self):
70 return (self.typeid, self.address, self.id)
71
72 def __setstate__(self, state):
73 (self.typeid, self.address, self.id) = state
74
75 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +030076 return '%s(typeid=%r, address=%r, id=%r)' % \
77 (self.__class__.__name__, self.typeid, self.address, self.id)
Benjamin Petersone711caf2008-06-11 16:44:04 +000078
79#
80# Function for communication with a manager's server process
81#
82
83def dispatch(c, id, methodname, args=(), kwds={}):
84 '''
85 Send a message to manager using connection `c` and return response
86 '''
87 c.send((id, methodname, args, kwds))
88 kind, result = c.recv()
89 if kind == '#RETURN':
90 return result
91 raise convert_to_error(kind, result)
92
93def convert_to_error(kind, result):
94 if kind == '#ERROR':
95 return result
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -050096 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
97 if not isinstance(result, str):
98 raise TypeError(
99 "Result {0!r} (kind '{1}') type is {2}, not str".format(
100 result, kind, type(result)))
101 if kind == '#UNSERIALIZABLE':
102 return RemoteError('Unserializable message: %s\n' % result)
103 else:
104 return RemoteError(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000105 else:
Allen W. Smith, Ph.D48d98232017-08-12 10:37:09 -0500106 return ValueError('Unrecognized message type {!r}'.format(kind))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000107
108class RemoteError(Exception):
109 def __str__(self):
110 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
111
112#
113# Functions for finding the method names of an object
114#
115
116def all_methods(obj):
117 '''
118 Return a list of names of methods of `obj`
119 '''
120 temp = []
121 for name in dir(obj):
122 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200123 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000124 temp.append(name)
125 return temp
126
127def public_methods(obj):
128 '''
129 Return a list of names of methods of `obj` which do not start with '_'
130 '''
131 return [name for name in all_methods(obj) if name[0] != '_']
132
133#
134# Server which is run in a process controlled by a manager
135#
136
137class Server(object):
138 '''
139 Server class which runs in a process controlled by a manager object
140 '''
141 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
142 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
143
144 def __init__(self, registry, address, authkey, serializer):
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500145 if not isinstance(authkey, bytes):
146 raise TypeError(
147 "Authkey {0!r} is type {1!s}, not bytes".format(
148 authkey, type(authkey)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100150 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151 Listener, Client = listener_client[serializer]
152
153 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100154 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155 self.address = self.listener.address
156
Jesse Noller63b3a972009-01-21 02:15:48 +0000157 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158 self.id_to_refcount = {}
Davin Potts86a76682016-09-07 18:48:01 -0500159 self.id_to_local_proxy_obj = {}
160 self.mutex = threading.Lock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161
162 def serve_forever(self):
163 '''
164 Run the server forever
165 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100166 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100167 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000168 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100169 accepter = threading.Thread(target=self.accepter)
170 accepter.daemon = True
171 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100173 while not self.stop_event.is_set():
174 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175 except (KeyboardInterrupt, SystemExit):
176 pass
177 finally:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500178 if sys.stdout != sys.__stdout__: # what about stderr?
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100179 util.debug('resetting stdout, stderr')
180 sys.stdout = sys.__stdout__
181 sys.stderr = sys.__stderr__
182 sys.exit(0)
183
184 def accepter(self):
185 while True:
186 try:
187 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200188 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100189 continue
190 t = threading.Thread(target=self.handle_request, args=(c,))
191 t.daemon = True
192 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193
194 def handle_request(self, c):
195 '''
196 Handle a new connection
197 '''
198 funcname = result = request = None
199 try:
200 connection.deliver_challenge(c, self.authkey)
201 connection.answer_challenge(c, self.authkey)
202 request = c.recv()
203 ignore, funcname, args, kwds = request
204 assert funcname in self.public, '%r unrecognized' % funcname
205 func = getattr(self, funcname)
206 except Exception:
207 msg = ('#TRACEBACK', format_exc())
208 else:
209 try:
210 result = func(c, *args, **kwds)
211 except Exception:
212 msg = ('#TRACEBACK', format_exc())
213 else:
214 msg = ('#RETURN', result)
215 try:
216 c.send(msg)
217 except Exception as e:
218 try:
219 c.send(('#TRACEBACK', format_exc()))
220 except Exception:
221 pass
222 util.info('Failure to send message: %r', msg)
223 util.info(' ... request was %r', request)
224 util.info(' ... exception was %r', e)
225
226 c.close()
227
228 def serve_client(self, conn):
229 '''
230 Handle requests from the proxies in a particular process/thread
231 '''
232 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000233 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234
235 recv = conn.recv
236 send = conn.send
237 id_to_obj = self.id_to_obj
238
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100239 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240
241 try:
242 methodname = obj = None
243 request = recv()
244 ident, methodname, args, kwds = request
Davin Potts86a76682016-09-07 18:48:01 -0500245 try:
246 obj, exposed, gettypeid = id_to_obj[ident]
247 except KeyError as ke:
248 try:
249 obj, exposed, gettypeid = \
250 self.id_to_local_proxy_obj[ident]
251 except KeyError as second_ke:
252 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000253
254 if methodname not in exposed:
255 raise AttributeError(
256 'method %r of %r object is not in exposed=%r' %
257 (methodname, type(obj), exposed)
258 )
259
260 function = getattr(obj, methodname)
261
262 try:
263 res = function(*args, **kwds)
264 except Exception as e:
265 msg = ('#ERROR', e)
266 else:
267 typeid = gettypeid and gettypeid.get(methodname, None)
268 if typeid:
269 rident, rexposed = self.create(conn, typeid, res)
270 token = Token(typeid, self.address, rident)
271 msg = ('#PROXY', (rexposed, token))
272 else:
273 msg = ('#RETURN', res)
274
275 except AttributeError:
276 if methodname is None:
277 msg = ('#TRACEBACK', format_exc())
278 else:
279 try:
280 fallback_func = self.fallback_mapping[methodname]
281 result = fallback_func(
282 self, conn, ident, obj, *args, **kwds
283 )
284 msg = ('#RETURN', result)
285 except Exception:
286 msg = ('#TRACEBACK', format_exc())
287
288 except EOFError:
289 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000290 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291 sys.exit(0)
292
293 except Exception:
294 msg = ('#TRACEBACK', format_exc())
295
296 try:
297 try:
298 send(msg)
299 except Exception as e:
Davin Potts37156a72016-09-08 14:40:36 -0500300 send(('#UNSERIALIZABLE', format_exc()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 except Exception as e:
302 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000303 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304 util.info(' ... message was %r', msg)
305 util.info(' ... exception was %r', e)
306 conn.close()
307 sys.exit(1)
308
309 def fallback_getvalue(self, conn, ident, obj):
310 return obj
311
312 def fallback_str(self, conn, ident, obj):
313 return str(obj)
314
315 def fallback_repr(self, conn, ident, obj):
316 return repr(obj)
317
318 fallback_mapping = {
319 '__str__':fallback_str,
320 '__repr__':fallback_repr,
321 '#GETVALUE':fallback_getvalue
322 }
323
324 def dummy(self, c):
325 pass
326
327 def debug_info(self, c):
328 '''
329 Return some info --- useful to spot problems with refcounting
330 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500331 # Perhaps include debug info about 'c'?
Charles-François Natalia924fc72014-05-25 14:12:12 +0100332 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333 result = []
Davin Potts86a76682016-09-07 18:48:01 -0500334 keys = list(self.id_to_refcount.keys())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000335 keys.sort()
336 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000337 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000338 result.append(' %s: refcount=%s\n %s' %
339 (ident, self.id_to_refcount[ident],
340 str(self.id_to_obj[ident][0])[:75]))
341 return '\n'.join(result)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000342
343 def number_of_objects(self, c):
344 '''
345 Number of shared objects
346 '''
Davin Potts86a76682016-09-07 18:48:01 -0500347 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
348 return len(self.id_to_refcount)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000349
350 def shutdown(self, c):
351 '''
352 Shutdown this process
353 '''
354 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100355 util.debug('manager received shutdown message')
356 c.send(('#RETURN', None))
357 except:
358 import traceback
359 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000360 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100361 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000362
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300363 def create(*args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000364 '''
365 Create a new shared object and return its id
366 '''
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300367 if len(args) >= 3:
368 self, c, typeid, *args = args
369 elif not args:
370 raise TypeError("descriptor 'create' of 'Server' object "
371 "needs an argument")
372 else:
373 if 'typeid' not in kwds:
374 raise TypeError('create expected at least 2 positional '
375 'arguments, got %d' % (len(args)-1))
376 typeid = kwds.pop('typeid')
377 if len(args) >= 2:
378 self, c, *args = args
379 import warnings
380 warnings.warn("Passing 'typeid' as keyword argument is deprecated",
381 DeprecationWarning, stacklevel=2)
382 else:
383 if 'c' not in kwds:
384 raise TypeError('create expected at least 2 positional '
385 'arguments, got %d' % (len(args)-1))
386 c = kwds.pop('c')
387 self, *args = args
388 import warnings
389 warnings.warn("Passing 'c' as keyword argument is deprecated",
390 DeprecationWarning, stacklevel=2)
391 args = tuple(args)
392
Charles-François Natalia924fc72014-05-25 14:12:12 +0100393 with self.mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000394 callable, exposed, method_to_typeid, proxytype = \
395 self.registry[typeid]
396
397 if callable is None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500398 if kwds or (len(args) != 1):
399 raise ValueError(
400 "Without callable, must have one non-keyword argument")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000401 obj = args[0]
402 else:
403 obj = callable(*args, **kwds)
404
405 if exposed is None:
406 exposed = public_methods(obj)
407 if method_to_typeid is not None:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500408 if not isinstance(method_to_typeid, dict):
409 raise TypeError(
410 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
411 method_to_typeid, type(method_to_typeid)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000412 exposed = list(exposed) + list(method_to_typeid)
413
414 ident = '%x' % id(obj) # convert to string because xmlrpclib
415 # only has 32 bit signed integers
416 util.debug('%r callable returned object with id %r', typeid, ident)
417
418 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
419 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000420 self.id_to_refcount[ident] = 0
Davin Potts86a76682016-09-07 18:48:01 -0500421
422 self.incref(c, ident)
423 return ident, tuple(exposed)
Serhiy Storchakad53cf992019-05-06 22:40:27 +0300424 create.__text_signature__ = '($self, c, typeid, /, *args, **kwds)'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425
426 def get_methods(self, c, token):
427 '''
428 Return the methods of the shared object indicated by token
429 '''
430 return tuple(self.id_to_obj[token.id][1])
431
432 def accept_connection(self, c, name):
433 '''
434 Spawn a new thread to serve this connection
435 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000436 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000437 c.send(('#RETURN', None))
438 self.serve_client(c)
439
440 def incref(self, c, ident):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100441 with self.mutex:
Davin Potts86a76682016-09-07 18:48:01 -0500442 try:
443 self.id_to_refcount[ident] += 1
444 except KeyError as ke:
445 # If no external references exist but an internal (to the
446 # manager) still does and a new external reference is created
447 # from it, restore the manager's tracking of it from the
448 # previously stashed internal ref.
449 if ident in self.id_to_local_proxy_obj:
450 self.id_to_refcount[ident] = 1
451 self.id_to_obj[ident] = \
452 self.id_to_local_proxy_obj[ident]
453 obj, exposed, gettypeid = self.id_to_obj[ident]
454 util.debug('Server re-enabled tracking & INCREF %r', ident)
455 else:
456 raise ke
Benjamin Petersone711caf2008-06-11 16:44:04 +0000457
458 def decref(self, c, ident):
Davin Potts86a76682016-09-07 18:48:01 -0500459 if ident not in self.id_to_refcount and \
460 ident in self.id_to_local_proxy_obj:
461 util.debug('Server DECREF skipping %r', ident)
462 return
463
Charles-François Natalia924fc72014-05-25 14:12:12 +0100464 with self.mutex:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500465 if self.id_to_refcount[ident] <= 0:
466 raise AssertionError(
467 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
468 ident, self.id_to_obj[ident],
469 self.id_to_refcount[ident]))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000470 self.id_to_refcount[ident] -= 1
471 if self.id_to_refcount[ident] == 0:
Davin Potts86a76682016-09-07 18:48:01 -0500472 del self.id_to_refcount[ident]
473
474 if ident not in self.id_to_refcount:
475 # Two-step process in case the object turns out to contain other
476 # proxy objects (e.g. a managed list of managed lists).
477 # Otherwise, deleting self.id_to_obj[ident] would trigger the
478 # deleting of the stored value (another managed object) which would
479 # in turn attempt to acquire the mutex that is already held here.
480 self.id_to_obj[ident] = (None, (), None) # thread-safe
481 util.debug('disposing of obj with id %r', ident)
482 with self.mutex:
483 del self.id_to_obj[ident]
484
Benjamin Petersone711caf2008-06-11 16:44:04 +0000485
486#
487# Class to represent state of a manager
488#
489
490class State(object):
491 __slots__ = ['value']
492 INITIAL = 0
493 STARTED = 1
494 SHUTDOWN = 2
495
496#
497# Mapping from serializer name to Listener and Client types
498#
499
500listener_client = {
501 'pickle' : (connection.Listener, connection.Client),
502 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
503 }
504
505#
506# Definition of BaseManager
507#
508
509class BaseManager(object):
510 '''
511 Base class for managers
512 '''
513 _registry = {}
514 _Server = Server
515
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100516 def __init__(self, address=None, authkey=None, serializer='pickle',
517 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000518 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100519 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100521 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000522 self._state = State()
523 self._state.value = State.INITIAL
524 self._serializer = serializer
525 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100526 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000527
Benjamin Petersone711caf2008-06-11 16:44:04 +0000528 def get_server(self):
529 '''
530 Return server object with serve_forever() method and address attribute
531 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500532 if self._state.value != State.INITIAL:
533 if self._state.value == State.STARTED:
534 raise ProcessError("Already started server")
535 elif self._state.value == State.SHUTDOWN:
536 raise ProcessError("Manager has shut down")
537 else:
538 raise ProcessError(
539 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000540 return Server(self._registry, self._address,
541 self._authkey, self._serializer)
542
543 def connect(self):
544 '''
545 Connect manager object to the server process
546 '''
547 Listener, Client = listener_client[self._serializer]
548 conn = Client(self._address, authkey=self._authkey)
549 dispatch(conn, None, 'dummy')
550 self._state.value = State.STARTED
551
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000552 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000553 '''
554 Spawn a server process for this manager object
555 '''
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500556 if self._state.value != State.INITIAL:
557 if self._state.value == State.STARTED:
558 raise ProcessError("Already started server")
559 elif self._state.value == State.SHUTDOWN:
560 raise ProcessError("Manager has shut down")
561 else:
562 raise ProcessError(
563 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000564
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200565 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000566 raise TypeError('initializer must be a callable')
567
Benjamin Petersone711caf2008-06-11 16:44:04 +0000568 # pipe over which we will retrieve address of server
569 reader, writer = connection.Pipe(duplex=False)
570
571 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100572 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000573 target=type(self)._run_server,
574 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000575 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000576 )
577 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000578 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000579 self._process.start()
580
581 # get address of server
582 writer.close()
583 self._address = reader.recv()
584 reader.close()
585
586 # register a finalizer
587 self._state.value = State.STARTED
588 self.shutdown = util.Finalize(
589 self, type(self)._finalize_manager,
590 args=(self._process, self._address, self._authkey,
591 self._state, self._Client),
592 exitpriority=0
593 )
594
595 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000596 def _run_server(cls, registry, address, authkey, serializer, writer,
597 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000598 '''
599 Create a server, report its address and run it
600 '''
Pierre Glaserd0d64ad2019-05-10 20:42:35 +0200601 # bpo-36368: protect server process from KeyboardInterrupt signals
602 signal.signal(signal.SIGINT, signal.SIG_IGN)
603
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000604 if initializer is not None:
605 initializer(*initargs)
606
Benjamin Petersone711caf2008-06-11 16:44:04 +0000607 # create server
608 server = cls._Server(registry, address, authkey, serializer)
609
610 # inform parent process of the server's address
611 writer.send(server.address)
612 writer.close()
613
614 # run the manager
615 util.info('manager serving at %r', server.address)
616 server.serve_forever()
617
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300618 def _create(*args, **kwds):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000619 '''
620 Create a new shared object; return the token and exposed tuple
621 '''
Serhiy Storchaka42a139e2019-04-01 09:16:35 +0300622 self, typeid, *args = args
623 args = tuple(args)
624
Benjamin Petersone711caf2008-06-11 16:44:04 +0000625 assert self._state.value == State.STARTED, 'server not yet started'
626 conn = self._Client(self._address, authkey=self._authkey)
627 try:
628 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
629 finally:
630 conn.close()
631 return Token(typeid, self._address, id), exposed
632
633 def join(self, timeout=None):
634 '''
635 Join the manager process (if it has been spawned)
636 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100637 if self._process is not None:
638 self._process.join(timeout)
639 if not self._process.is_alive():
640 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000641
642 def _debug_info(self):
643 '''
644 Return some info about the servers shared objects and connections
645 '''
646 conn = self._Client(self._address, authkey=self._authkey)
647 try:
648 return dispatch(conn, None, 'debug_info')
649 finally:
650 conn.close()
651
652 def _number_of_objects(self):
653 '''
654 Return the number of shared objects
655 '''
656 conn = self._Client(self._address, authkey=self._authkey)
657 try:
658 return dispatch(conn, None, 'number_of_objects')
659 finally:
660 conn.close()
661
662 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100663 if self._state.value == State.INITIAL:
664 self.start()
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500665 if self._state.value != State.STARTED:
666 if self._state.value == State.INITIAL:
667 raise ProcessError("Unable to start server")
668 elif self._state.value == State.SHUTDOWN:
669 raise ProcessError("Manager has shut down")
670 else:
671 raise ProcessError(
672 "Unknown state {!r}".format(self._state.value))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000673 return self
674
675 def __exit__(self, exc_type, exc_val, exc_tb):
676 self.shutdown()
677
678 @staticmethod
679 def _finalize_manager(process, address, authkey, state, _Client):
680 '''
681 Shutdown the manager process; will be registered as a finalizer
682 '''
683 if process.is_alive():
684 util.info('sending shutdown message to manager')
685 try:
686 conn = _Client(address, authkey=authkey)
687 try:
688 dispatch(conn, None, 'shutdown')
689 finally:
690 conn.close()
691 except Exception:
692 pass
693
Richard Oudkerk3049f122012-06-15 20:08:29 +0100694 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000695 if process.is_alive():
696 util.info('manager still alive')
697 if hasattr(process, 'terminate'):
698 util.info('trying to `terminate()` manager process')
699 process.terminate()
700 process.join(timeout=0.1)
701 if process.is_alive():
702 util.info('manager still alive after terminate')
703
704 state.value = State.SHUTDOWN
705 try:
706 del BaseProxy._address_to_local[address]
707 except KeyError:
708 pass
709
Serhiy Storchakabdf6b912017-03-19 08:40:32 +0200710 @property
711 def address(self):
712 return self._address
Benjamin Petersone711caf2008-06-11 16:44:04 +0000713
714 @classmethod
715 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
716 method_to_typeid=None, create_method=True):
717 '''
718 Register a typeid with the manager type
719 '''
720 if '_registry' not in cls.__dict__:
721 cls._registry = cls._registry.copy()
722
723 if proxytype is None:
724 proxytype = AutoProxy
725
726 exposed = exposed or getattr(proxytype, '_exposed_', None)
727
728 method_to_typeid = method_to_typeid or \
729 getattr(proxytype, '_method_to_typeid_', None)
730
731 if method_to_typeid:
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500732 for key, value in list(method_to_typeid.items()): # isinstance?
Benjamin Petersone711caf2008-06-11 16:44:04 +0000733 assert type(key) is str, '%r is not a string' % key
734 assert type(value) is str, '%r is not a string' % value
735
736 cls._registry[typeid] = (
737 callable, exposed, method_to_typeid, proxytype
738 )
739
740 if create_method:
741 def temp(self, *args, **kwds):
742 util.debug('requesting creation of a shared %r object', typeid)
743 token, exp = self._create(typeid, *args, **kwds)
744 proxy = proxytype(
745 token, self._serializer, manager=self,
746 authkey=self._authkey, exposed=exp
747 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000748 conn = self._Client(token.address, authkey=self._authkey)
749 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000750 return proxy
751 temp.__name__ = typeid
752 setattr(cls, typeid, temp)
753
754#
755# Subclass of set which get cleared after a fork
756#
757
758class ProcessLocalSet(set):
759 def __init__(self):
760 util.register_after_fork(self, lambda obj: obj.clear())
761 def __reduce__(self):
762 return type(self), ()
763
764#
765# Definition of BaseProxy
766#
767
768class BaseProxy(object):
769 '''
770 A base for proxies of shared objects
771 '''
772 _address_to_local = {}
773 _mutex = util.ForkAwareThreadLock()
774
775 def __init__(self, token, serializer, manager=None,
Davin Potts86a76682016-09-07 18:48:01 -0500776 authkey=None, exposed=None, incref=True, manager_owned=False):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100777 with BaseProxy._mutex:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000778 tls_idset = BaseProxy._address_to_local.get(token.address, None)
779 if tls_idset is None:
780 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
781 BaseProxy._address_to_local[token.address] = tls_idset
Benjamin Petersone711caf2008-06-11 16:44:04 +0000782
783 # self._tls is used to record the connection used by this
784 # thread to communicate with the manager at token.address
785 self._tls = tls_idset[0]
786
787 # self._idset is used to record the identities of all shared
788 # objects for which the current process owns references and
789 # which are in the manager at token.address
790 self._idset = tls_idset[1]
791
792 self._token = token
793 self._id = self._token.id
794 self._manager = manager
795 self._serializer = serializer
796 self._Client = listener_client[serializer][1]
797
Davin Potts86a76682016-09-07 18:48:01 -0500798 # Should be set to True only when a proxy object is being created
799 # on the manager server; primary use case: nested proxy objects.
800 # RebuildProxy detects when a proxy is being created on the manager
801 # and sets this value appropriately.
802 self._owned_by_manager = manager_owned
803
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100805 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000806 elif self._manager is not None:
807 self._authkey = self._manager._authkey
808 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100809 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000810
811 if incref:
812 self._incref()
813
814 util.register_after_fork(self, BaseProxy._after_fork)
815
816 def _connect(self):
817 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100818 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000819 if threading.current_thread().name != 'MainThread':
820 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000821 conn = self._Client(self._token.address, authkey=self._authkey)
822 dispatch(conn, None, 'accept_connection', (name,))
823 self._tls.connection = conn
824
825 def _callmethod(self, methodname, args=(), kwds={}):
826 '''
827 Try to call a method of the referrent and return a copy of the result
828 '''
829 try:
830 conn = self._tls.connection
831 except AttributeError:
832 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000833 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834 self._connect()
835 conn = self._tls.connection
836
837 conn.send((self._id, methodname, args, kwds))
838 kind, result = conn.recv()
839
840 if kind == '#RETURN':
841 return result
842 elif kind == '#PROXY':
843 exposed, token = result
844 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100845 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000846 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000847 token, self._serializer, manager=self._manager,
848 authkey=self._authkey, exposed=exposed
849 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000850 conn = self._Client(token.address, authkey=self._authkey)
851 dispatch(conn, None, 'decref', (token.id,))
852 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 raise convert_to_error(kind, result)
854
855 def _getvalue(self):
856 '''
857 Get a copy of the value of the referent
858 '''
859 return self._callmethod('#GETVALUE')
860
861 def _incref(self):
Davin Potts86a76682016-09-07 18:48:01 -0500862 if self._owned_by_manager:
863 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
864 return
865
Benjamin Petersone711caf2008-06-11 16:44:04 +0000866 conn = self._Client(self._token.address, authkey=self._authkey)
867 dispatch(conn, None, 'incref', (self._id,))
868 util.debug('INCREF %r', self._token.id)
869
870 self._idset.add(self._id)
871
872 state = self._manager and self._manager._state
873
874 self._close = util.Finalize(
875 self, BaseProxy._decref,
876 args=(self._token, self._authkey, state,
877 self._tls, self._idset, self._Client),
878 exitpriority=10
879 )
880
881 @staticmethod
882 def _decref(token, authkey, state, tls, idset, _Client):
883 idset.discard(token.id)
884
885 # check whether manager is still alive
886 if state is None or state.value == State.STARTED:
887 # tell manager this process no longer cares about referent
888 try:
889 util.debug('DECREF %r', token.id)
890 conn = _Client(token.address, authkey=authkey)
891 dispatch(conn, None, 'decref', (token.id,))
892 except Exception as e:
893 util.debug('... decref failed %s', e)
894
895 else:
896 util.debug('DECREF %r -- manager already shutdown', token.id)
897
898 # check whether we can close this thread's connection because
899 # the process owns no more references to objects for this manager
900 if not idset and hasattr(tls, 'connection'):
901 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000902 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000903 tls.connection.close()
904 del tls.connection
905
906 def _after_fork(self):
907 self._manager = None
908 try:
909 self._incref()
910 except Exception as e:
911 # the proxy may just be for a manager which has shutdown
912 util.info('incref failed: %s' % e)
913
914 def __reduce__(self):
915 kwds = {}
Davin Potts54586472016-09-09 18:03:10 -0500916 if get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000917 kwds['authkey'] = self._authkey
918
919 if getattr(self, '_isauto', False):
920 kwds['exposed'] = self._exposed_
921 return (RebuildProxy,
922 (AutoProxy, self._token, self._serializer, kwds))
923 else:
924 return (RebuildProxy,
925 (type(self), self._token, self._serializer, kwds))
926
927 def __deepcopy__(self, memo):
928 return self._getvalue()
929
930 def __repr__(self):
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300931 return '<%s object, typeid %r at %#x>' % \
932 (type(self).__name__, self._token.typeid, id(self))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000933
934 def __str__(self):
935 '''
936 Return representation of the referent (or a fall-back if that fails)
937 '''
938 try:
939 return self._callmethod('__repr__')
940 except Exception:
941 return repr(self)[:-1] + "; '__str__()' failed>"
942
943#
944# Function used for unpickling
945#
946
947def RebuildProxy(func, token, serializer, kwds):
948 '''
949 Function used for unpickling proxy objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +0000950 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100951 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000952 if server and server.address == token.address:
Davin Potts86a76682016-09-07 18:48:01 -0500953 util.debug('Rebuild a proxy owned by manager, token=%r', token)
954 kwds['manager_owned'] = True
955 if token.id not in server.id_to_local_proxy_obj:
956 server.id_to_local_proxy_obj[token.id] = \
957 server.id_to_obj[token.id]
958 incref = (
959 kwds.pop('incref', True) and
960 not getattr(process.current_process(), '_inheriting', False)
961 )
962 return func(token, serializer, incref=incref, **kwds)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000963
964#
965# Functions to create proxies and proxy types
966#
967
968def MakeProxyType(name, exposed, _cache={}):
969 '''
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +0300970 Return a proxy type whose methods are given by `exposed`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 '''
972 exposed = tuple(exposed)
973 try:
974 return _cache[(name, exposed)]
975 except KeyError:
976 pass
977
978 dic = {}
979
980 for meth in exposed:
981 exec('''def %s(self, *args, **kwds):
982 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
983
984 ProxyType = type(name, (BaseProxy,), dic)
985 ProxyType._exposed_ = exposed
986 _cache[(name, exposed)] = ProxyType
987 return ProxyType
988
989
990def AutoProxy(token, serializer, manager=None, authkey=None,
991 exposed=None, incref=True):
992 '''
993 Return an auto-proxy for `token`
994 '''
995 _Client = listener_client[serializer][1]
996
997 if exposed is None:
998 conn = _Client(token.address, authkey=authkey)
999 try:
1000 exposed = dispatch(conn, None, 'get_methods', (token,))
1001 finally:
1002 conn.close()
1003
1004 if authkey is None and manager is not None:
1005 authkey = manager._authkey
1006 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001007 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +00001008
1009 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
1010 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
1011 incref=incref)
1012 proxy._isauto = True
1013 return proxy
1014
1015#
1016# Types/callables which we will register with SyncManager
1017#
1018
1019class Namespace(object):
1020 def __init__(self, **kwds):
1021 self.__dict__.update(kwds)
1022 def __repr__(self):
1023 items = list(self.__dict__.items())
1024 temp = []
1025 for name, value in items:
1026 if not name.startswith('_'):
1027 temp.append('%s=%r' % (name, value))
1028 temp.sort()
Serhiy Storchaka465e60e2014-07-25 23:36:00 +03001029 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001030
1031class Value(object):
1032 def __init__(self, typecode, value, lock=True):
1033 self._typecode = typecode
1034 self._value = value
1035 def get(self):
1036 return self._value
1037 def set(self, value):
1038 self._value = value
1039 def __repr__(self):
1040 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1041 value = property(get, set)
1042
1043def Array(typecode, sequence, lock=True):
1044 return array.array(typecode, sequence)
1045
1046#
1047# Proxy types used by SyncManager
1048#
1049
1050class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +00001051 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001052 def __iter__(self):
1053 return self
1054 def __next__(self, *args):
1055 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001056 def send(self, *args):
1057 return self._callmethod('send', args)
1058 def throw(self, *args):
1059 return self._callmethod('throw', args)
1060 def close(self, *args):
1061 return self._callmethod('close', args)
1062
1063
1064class AcquirerProxy(BaseProxy):
1065 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +01001066 def acquire(self, blocking=True, timeout=None):
1067 args = (blocking,) if timeout is None else (blocking, timeout)
1068 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001069 def release(self):
1070 return self._callmethod('release')
1071 def __enter__(self):
1072 return self._callmethod('acquire')
1073 def __exit__(self, exc_type, exc_val, exc_tb):
1074 return self._callmethod('release')
1075
1076
1077class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001078 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001079 def wait(self, timeout=None):
1080 return self._callmethod('wait', (timeout,))
Antoine Pitrou48350412017-07-04 08:59:22 +02001081 def notify(self, n=1):
1082 return self._callmethod('notify', (n,))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001083 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001084 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001085 def wait_for(self, predicate, timeout=None):
1086 result = predicate()
1087 if result:
1088 return result
1089 if timeout is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001090 endtime = time.monotonic() + timeout
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001091 else:
1092 endtime = None
1093 waittime = None
1094 while not result:
1095 if endtime is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +02001096 waittime = endtime - time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001097 if waittime <= 0:
1098 break
1099 self.wait(waittime)
1100 result = predicate()
1101 return result
1102
Benjamin Petersone711caf2008-06-11 16:44:04 +00001103
1104class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001105 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001106 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001107 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001108 def set(self):
1109 return self._callmethod('set')
1110 def clear(self):
1111 return self._callmethod('clear')
1112 def wait(self, timeout=None):
1113 return self._callmethod('wait', (timeout,))
1114
Richard Oudkerk3730a172012-06-15 18:26:07 +01001115
1116class BarrierProxy(BaseProxy):
1117 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1118 def wait(self, timeout=None):
1119 return self._callmethod('wait', (timeout,))
1120 def abort(self):
1121 return self._callmethod('abort')
1122 def reset(self):
1123 return self._callmethod('reset')
1124 @property
1125 def parties(self):
1126 return self._callmethod('__getattribute__', ('parties',))
1127 @property
1128 def n_waiting(self):
1129 return self._callmethod('__getattribute__', ('n_waiting',))
1130 @property
1131 def broken(self):
1132 return self._callmethod('__getattribute__', ('broken',))
1133
1134
Benjamin Petersone711caf2008-06-11 16:44:04 +00001135class NamespaceProxy(BaseProxy):
1136 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1137 def __getattr__(self, key):
1138 if key[0] == '_':
1139 return object.__getattribute__(self, key)
1140 callmethod = object.__getattribute__(self, '_callmethod')
1141 return callmethod('__getattribute__', (key,))
1142 def __setattr__(self, key, value):
1143 if key[0] == '_':
1144 return object.__setattr__(self, key, value)
1145 callmethod = object.__getattribute__(self, '_callmethod')
1146 return callmethod('__setattr__', (key, value))
1147 def __delattr__(self, key):
1148 if key[0] == '_':
1149 return object.__delattr__(self, key)
1150 callmethod = object.__getattribute__(self, '_callmethod')
1151 return callmethod('__delattr__', (key,))
1152
1153
1154class ValueProxy(BaseProxy):
1155 _exposed_ = ('get', 'set')
1156 def get(self):
1157 return self._callmethod('get')
1158 def set(self, value):
1159 return self._callmethod('set', (value,))
1160 value = property(get, set)
1161
1162
1163BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001164 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1165 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001166 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1167 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001168 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001169class ListProxy(BaseListProxy):
1170 def __iadd__(self, value):
1171 self._callmethod('extend', (value,))
1172 return self
1173 def __imul__(self, value):
1174 self._callmethod('__imul__', (value,))
1175 return self
1176
1177
1178DictProxy = MakeProxyType('DictProxy', (
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001179 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
Rémi Lapeyrea31f4cc2019-02-12 01:37:24 +01001180 '__setitem__', 'clear', 'copy', 'get', 'items',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001181 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1182 ))
Serhiy Storchakae0e50652018-09-17 14:24:01 +03001183DictProxy._method_to_typeid_ = {
1184 '__iter__': 'Iterator',
1185 }
Benjamin Petersone711caf2008-06-11 16:44:04 +00001186
1187
1188ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001189 '__len__', '__getitem__', '__setitem__'
1190 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001191
1192
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001193BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001194 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001195 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001196 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001197BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001198 'apply_async': 'AsyncResult',
1199 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001200 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001201 'imap': 'Iterator',
1202 'imap_unordered': 'Iterator'
1203 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001204class PoolProxy(BasePoolProxy):
1205 def __enter__(self):
1206 return self
1207 def __exit__(self, exc_type, exc_val, exc_tb):
1208 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001209
1210#
1211# Definition of SyncManager
1212#
1213
1214class SyncManager(BaseManager):
1215 '''
1216 Subclass of `BaseManager` which supports a number of shared object types.
1217
1218 The types registered are those intended for the synchronization
1219 of threads, plus `dict`, `list` and `Namespace`.
1220
1221 The `multiprocessing.Manager()` function creates started instances of
1222 this class.
1223 '''
1224
1225SyncManager.register('Queue', queue.Queue)
1226SyncManager.register('JoinableQueue', queue.Queue)
1227SyncManager.register('Event', threading.Event, EventProxy)
1228SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1229SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1230SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1231SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1232 AcquirerProxy)
1233SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001234SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001235SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001236SyncManager.register('list', list, ListProxy)
1237SyncManager.register('dict', dict, DictProxy)
1238SyncManager.register('Value', Value, ValueProxy)
1239SyncManager.register('Array', Array, ArrayProxy)
1240SyncManager.register('Namespace', Namespace, NamespaceProxy)
1241
1242# types returned by methods of PoolProxy
1243SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1244SyncManager.register('AsyncResult', create_method=False)
Davin Pottse895de32019-02-23 22:08:16 -06001245
1246#
1247# Definition of SharedMemoryManager and SharedMemoryServer
1248#
1249
1250if HAS_SHMEM:
1251 class _SharedMemoryTracker:
1252 "Manages one or more shared memory segments."
1253
1254 def __init__(self, name, segment_names=[]):
1255 self.shared_memory_context_name = name
1256 self.segment_names = segment_names
1257
1258 def register_segment(self, segment_name):
1259 "Adds the supplied shared memory block name to tracker."
1260 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1261 self.segment_names.append(segment_name)
1262
1263 def destroy_segment(self, segment_name):
1264 """Calls unlink() on the shared memory block with the supplied name
1265 and removes it from the list of blocks being tracked."""
1266 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1267 self.segment_names.remove(segment_name)
1268 segment = shared_memory.SharedMemory(segment_name)
1269 segment.close()
1270 segment.unlink()
1271
1272 def unlink(self):
1273 "Calls destroy_segment() on all tracked shared memory blocks."
1274 for segment_name in self.segment_names[:]:
1275 self.destroy_segment(segment_name)
1276
1277 def __del__(self):
1278 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1279 self.unlink()
1280
1281 def __getstate__(self):
1282 return (self.shared_memory_context_name, self.segment_names)
1283
1284 def __setstate__(self, state):
1285 self.__init__(*state)
1286
1287
1288 class SharedMemoryServer(Server):
1289
1290 public = Server.public + \
1291 ['track_segment', 'release_segment', 'list_segments']
1292
1293 def __init__(self, *args, **kwargs):
1294 Server.__init__(self, *args, **kwargs)
1295 self.shared_memory_context = \
1296 _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
1297 util.debug(f"SharedMemoryServer started by pid {getpid()}")
1298
Serhiy Storchaka42a139e2019-04-01 09:16:35 +03001299 def create(*args, **kwargs):
Davin Pottse895de32019-02-23 22:08:16 -06001300 """Create a new distributed-shared object (not backed by a shared
1301 memory block) and return its id to be used in a Proxy Object."""
1302 # Unless set up as a shared proxy, don't make shared_memory_context
1303 # a standard part of kwargs. This makes things easier for supplying
1304 # simple functions.
Serhiy Storchaka42a139e2019-04-01 09:16:35 +03001305 if len(args) >= 3:
1306 typeod = args[2]
1307 elif 'typeid' in kwargs:
1308 typeid = kwargs['typeid']
1309 elif not args:
1310 raise TypeError("descriptor 'create' of 'SharedMemoryServer' "
1311 "object needs an argument")
1312 else:
1313 raise TypeError('create expected at least 2 positional '
1314 'arguments, got %d' % (len(args)-1))
Davin Pottse895de32019-02-23 22:08:16 -06001315 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1316 kwargs['shared_memory_context'] = self.shared_memory_context
Serhiy Storchaka42a139e2019-04-01 09:16:35 +03001317 return Server.create(*args, **kwargs)
Serhiy Storchakad53cf992019-05-06 22:40:27 +03001318 create.__text_signature__ = '($self, c, typeid, /, *args, **kwargs)'
Davin Pottse895de32019-02-23 22:08:16 -06001319
1320 def shutdown(self, c):
1321 "Call unlink() on all tracked shared memory, terminate the Server."
1322 self.shared_memory_context.unlink()
1323 return Server.shutdown(self, c)
1324
1325 def track_segment(self, c, segment_name):
1326 "Adds the supplied shared memory block name to Server's tracker."
1327 self.shared_memory_context.register_segment(segment_name)
1328
1329 def release_segment(self, c, segment_name):
1330 """Calls unlink() on the shared memory block with the supplied name
1331 and removes it from the tracker instance inside the Server."""
1332 self.shared_memory_context.destroy_segment(segment_name)
1333
1334 def list_segments(self, c):
1335 """Returns a list of names of shared memory blocks that the Server
1336 is currently tracking."""
1337 return self.shared_memory_context.segment_names
1338
1339
1340 class SharedMemoryManager(BaseManager):
1341 """Like SyncManager but uses SharedMemoryServer instead of Server.
1342
1343 It provides methods for creating and returning SharedMemory instances
1344 and for creating a list-like object (ShareableList) backed by shared
1345 memory. It also provides methods that create and return Proxy Objects
1346 that support synchronization across processes (i.e. multi-process-safe
1347 locks and semaphores).
1348 """
1349
1350 _Server = SharedMemoryServer
1351
1352 def __init__(self, *args, **kwargs):
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02001353 if os.name == "posix":
1354 # bpo-36867: Ensure the resource_tracker is running before
1355 # launching the manager process, so that concurrent
1356 # shared_memory manipulation both in the manager and in the
1357 # current process does not create two resource_tracker
1358 # processes.
1359 from . import resource_tracker
1360 resource_tracker.ensure_running()
Davin Pottse895de32019-02-23 22:08:16 -06001361 BaseManager.__init__(self, *args, **kwargs)
1362 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1363
1364 def __del__(self):
1365 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1366 pass
1367
1368 def get_server(self):
1369 'Better than monkeypatching for now; merge into Server ultimately'
1370 if self._state.value != State.INITIAL:
1371 if self._state.value == State.STARTED:
1372 raise ProcessError("Already started SharedMemoryServer")
1373 elif self._state.value == State.SHUTDOWN:
1374 raise ProcessError("SharedMemoryManager has shut down")
1375 else:
1376 raise ProcessError(
1377 "Unknown state {!r}".format(self._state.value))
1378 return self._Server(self._registry, self._address,
1379 self._authkey, self._serializer)
1380
1381 def SharedMemory(self, size):
1382 """Returns a new SharedMemory instance with the specified size in
1383 bytes, to be tracked by the manager."""
1384 with self._Client(self._address, authkey=self._authkey) as conn:
1385 sms = shared_memory.SharedMemory(None, create=True, size=size)
1386 try:
1387 dispatch(conn, None, 'track_segment', (sms.name,))
1388 except BaseException as e:
1389 sms.unlink()
1390 raise e
1391 return sms
1392
1393 def ShareableList(self, sequence):
1394 """Returns a new ShareableList instance populated with the values
1395 from the input sequence, to be tracked by the manager."""
1396 with self._Client(self._address, authkey=self._authkey) as conn:
1397 sl = shared_memory.ShareableList(sequence)
1398 try:
1399 dispatch(conn, None, 'track_segment', (sl.shm.name,))
1400 except BaseException as e:
1401 sl.shm.unlink()
1402 raise e
1403 return sl