blob: cded4f39d5c81e4691e22d9230e353159263431f [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
22from traceback import format_exc
23from multiprocessing import Process, current_process, active_children, Pool, util, connection
24from multiprocessing.process import AuthenticationString
Richard Oudkerk73d9a292012-06-14 15:30:10 +010025from multiprocessing.forking import Popen, ForkingPickler
Charles-François Natalic8ce7152012-04-17 18:45:57 +020026from time import time as _time
Benjamin Petersone711caf2008-06-11 16:44:04 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028#
Benjamin Petersone711caf2008-06-11 16:44:04 +000029# Register some things for pickling
30#
31
32def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000033 return array.array, (a.typecode, a.tobytes())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000034ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000035
36view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000037if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000038 def rebuild_as_list(obj):
39 return list, (list(obj),)
40 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000041 ForkingPickler.register(view_type, rebuild_as_list)
Amaury Forgeot d'Arcd757e732008-08-20 08:58:40 +000042 import copyreg
43 copyreg.pickle(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000044
45#
46# Type for identifying shared objects
47#
48
49class Token(object):
50 '''
51 Type to uniquely indentify a shared object
52 '''
53 __slots__ = ('typeid', 'address', 'id')
54
55 def __init__(self, typeid, address, id):
56 (self.typeid, self.address, self.id) = (typeid, address, id)
57
58 def __getstate__(self):
59 return (self.typeid, self.address, self.id)
60
61 def __setstate__(self, state):
62 (self.typeid, self.address, self.id) = state
63
64 def __repr__(self):
65 return 'Token(typeid=%r, address=%r, id=%r)' % \
66 (self.typeid, self.address, self.id)
67
68#
69# Function for communication with a manager's server process
70#
71
72def dispatch(c, id, methodname, args=(), kwds={}):
73 '''
74 Send a message to manager using connection `c` and return response
75 '''
76 c.send((id, methodname, args, kwds))
77 kind, result = c.recv()
78 if kind == '#RETURN':
79 return result
80 raise convert_to_error(kind, result)
81
82def convert_to_error(kind, result):
83 if kind == '#ERROR':
84 return result
85 elif kind == '#TRACEBACK':
86 assert type(result) is str
87 return RemoteError(result)
88 elif kind == '#UNSERIALIZABLE':
89 assert type(result) is str
90 return RemoteError('Unserializable message: %s\n' % result)
91 else:
92 return ValueError('Unrecognized message type')
93
94class RemoteError(Exception):
95 def __str__(self):
96 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
97
98#
99# Functions for finding the method names of an object
100#
101
102def all_methods(obj):
103 '''
104 Return a list of names of methods of `obj`
105 '''
106 temp = []
107 for name in dir(obj):
108 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200109 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000110 temp.append(name)
111 return temp
112
113def public_methods(obj):
114 '''
115 Return a list of names of methods of `obj` which do not start with '_'
116 '''
117 return [name for name in all_methods(obj) if name[0] != '_']
118
119#
120# Server which is run in a process controlled by a manager
121#
122
123class Server(object):
124 '''
125 Server class which runs in a process controlled by a manager object
126 '''
127 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
128 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
129
130 def __init__(self, registry, address, authkey, serializer):
131 assert isinstance(authkey, bytes)
132 self.registry = registry
133 self.authkey = AuthenticationString(authkey)
134 Listener, Client = listener_client[serializer]
135
136 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100137 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000138 self.address = self.listener.address
139
Jesse Noller63b3a972009-01-21 02:15:48 +0000140 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000141 self.id_to_refcount = {}
142 self.mutex = threading.RLock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143
144 def serve_forever(self):
145 '''
146 Run the server forever
147 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100148 self.stop_event = threading.Event()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 current_process()._manager_server = self
150 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100151 accepter = threading.Thread(target=self.accepter)
152 accepter.daemon = True
153 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000154 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100155 while not self.stop_event.is_set():
156 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000157 except (KeyboardInterrupt, SystemExit):
158 pass
159 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100160 if sys.stdout != sys.__stdout__:
161 util.debug('resetting stdout, stderr')
162 sys.stdout = sys.__stdout__
163 sys.stderr = sys.__stderr__
164 sys.exit(0)
165
166 def accepter(self):
167 while True:
168 try:
169 c = self.listener.accept()
170 except (OSError, IOError):
171 continue
172 t = threading.Thread(target=self.handle_request, args=(c,))
173 t.daemon = True
174 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175
176 def handle_request(self, c):
177 '''
178 Handle a new connection
179 '''
180 funcname = result = request = None
181 try:
182 connection.deliver_challenge(c, self.authkey)
183 connection.answer_challenge(c, self.authkey)
184 request = c.recv()
185 ignore, funcname, args, kwds = request
186 assert funcname in self.public, '%r unrecognized' % funcname
187 func = getattr(self, funcname)
188 except Exception:
189 msg = ('#TRACEBACK', format_exc())
190 else:
191 try:
192 result = func(c, *args, **kwds)
193 except Exception:
194 msg = ('#TRACEBACK', format_exc())
195 else:
196 msg = ('#RETURN', result)
197 try:
198 c.send(msg)
199 except Exception as e:
200 try:
201 c.send(('#TRACEBACK', format_exc()))
202 except Exception:
203 pass
204 util.info('Failure to send message: %r', msg)
205 util.info(' ... request was %r', request)
206 util.info(' ... exception was %r', e)
207
208 c.close()
209
210 def serve_client(self, conn):
211 '''
212 Handle requests from the proxies in a particular process/thread
213 '''
214 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000215 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
217 recv = conn.recv
218 send = conn.send
219 id_to_obj = self.id_to_obj
220
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100221 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222
223 try:
224 methodname = obj = None
225 request = recv()
226 ident, methodname, args, kwds = request
227 obj, exposed, gettypeid = id_to_obj[ident]
228
229 if methodname not in exposed:
230 raise AttributeError(
231 'method %r of %r object is not in exposed=%r' %
232 (methodname, type(obj), exposed)
233 )
234
235 function = getattr(obj, methodname)
236
237 try:
238 res = function(*args, **kwds)
239 except Exception as e:
240 msg = ('#ERROR', e)
241 else:
242 typeid = gettypeid and gettypeid.get(methodname, None)
243 if typeid:
244 rident, rexposed = self.create(conn, typeid, res)
245 token = Token(typeid, self.address, rident)
246 msg = ('#PROXY', (rexposed, token))
247 else:
248 msg = ('#RETURN', res)
249
250 except AttributeError:
251 if methodname is None:
252 msg = ('#TRACEBACK', format_exc())
253 else:
254 try:
255 fallback_func = self.fallback_mapping[methodname]
256 result = fallback_func(
257 self, conn, ident, obj, *args, **kwds
258 )
259 msg = ('#RETURN', result)
260 except Exception:
261 msg = ('#TRACEBACK', format_exc())
262
263 except EOFError:
264 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000265 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000266 sys.exit(0)
267
268 except Exception:
269 msg = ('#TRACEBACK', format_exc())
270
271 try:
272 try:
273 send(msg)
274 except Exception as e:
275 send(('#UNSERIALIZABLE', repr(msg)))
276 except Exception as e:
277 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000278 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279 util.info(' ... message was %r', msg)
280 util.info(' ... exception was %r', e)
281 conn.close()
282 sys.exit(1)
283
284 def fallback_getvalue(self, conn, ident, obj):
285 return obj
286
287 def fallback_str(self, conn, ident, obj):
288 return str(obj)
289
290 def fallback_repr(self, conn, ident, obj):
291 return repr(obj)
292
293 fallback_mapping = {
294 '__str__':fallback_str,
295 '__repr__':fallback_repr,
296 '#GETVALUE':fallback_getvalue
297 }
298
299 def dummy(self, c):
300 pass
301
302 def debug_info(self, c):
303 '''
304 Return some info --- useful to spot problems with refcounting
305 '''
306 self.mutex.acquire()
307 try:
308 result = []
309 keys = list(self.id_to_obj.keys())
310 keys.sort()
311 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000312 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000313 result.append(' %s: refcount=%s\n %s' %
314 (ident, self.id_to_refcount[ident],
315 str(self.id_to_obj[ident][0])[:75]))
316 return '\n'.join(result)
317 finally:
318 self.mutex.release()
319
320 def number_of_objects(self, c):
321 '''
322 Number of shared objects
323 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000324 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 def shutdown(self, c):
327 '''
328 Shutdown this process
329 '''
330 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100331 util.debug('manager received shutdown message')
332 c.send(('#RETURN', None))
333 except:
334 import traceback
335 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100337 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000338
339 def create(self, c, typeid, *args, **kwds):
340 '''
341 Create a new shared object and return its id
342 '''
343 self.mutex.acquire()
344 try:
345 callable, exposed, method_to_typeid, proxytype = \
346 self.registry[typeid]
347
348 if callable is None:
349 assert len(args) == 1 and not kwds
350 obj = args[0]
351 else:
352 obj = callable(*args, **kwds)
353
354 if exposed is None:
355 exposed = public_methods(obj)
356 if method_to_typeid is not None:
357 assert type(method_to_typeid) is dict
358 exposed = list(exposed) + list(method_to_typeid)
359
360 ident = '%x' % id(obj) # convert to string because xmlrpclib
361 # only has 32 bit signed integers
362 util.debug('%r callable returned object with id %r', typeid, ident)
363
364 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
365 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000366 self.id_to_refcount[ident] = 0
367 # increment the reference count immediately, to avoid
368 # this object being garbage collected before a Proxy
369 # object for it can be created. The caller of create()
370 # is responsible for doing a decref once the Proxy object
371 # has been created.
372 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000373 return ident, tuple(exposed)
374 finally:
375 self.mutex.release()
376
377 def get_methods(self, c, token):
378 '''
379 Return the methods of the shared object indicated by token
380 '''
381 return tuple(self.id_to_obj[token.id][1])
382
383 def accept_connection(self, c, name):
384 '''
385 Spawn a new thread to serve this connection
386 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000387 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000388 c.send(('#RETURN', None))
389 self.serve_client(c)
390
391 def incref(self, c, ident):
392 self.mutex.acquire()
393 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000394 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000395 finally:
396 self.mutex.release()
397
398 def decref(self, c, ident):
399 self.mutex.acquire()
400 try:
401 assert self.id_to_refcount[ident] >= 1
402 self.id_to_refcount[ident] -= 1
403 if self.id_to_refcount[ident] == 0:
404 del self.id_to_obj[ident], self.id_to_refcount[ident]
Benjamin Peterson4ac9ce42009-10-04 14:49:41 +0000405 util.debug('disposing of obj with id %r', ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000406 finally:
407 self.mutex.release()
408
409#
410# Class to represent state of a manager
411#
412
413class State(object):
414 __slots__ = ['value']
415 INITIAL = 0
416 STARTED = 1
417 SHUTDOWN = 2
418
419#
420# Mapping from serializer name to Listener and Client types
421#
422
423listener_client = {
424 'pickle' : (connection.Listener, connection.Client),
425 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
426 }
427
428#
429# Definition of BaseManager
430#
431
432class BaseManager(object):
433 '''
434 Base class for managers
435 '''
436 _registry = {}
437 _Server = Server
438
439 def __init__(self, address=None, authkey=None, serializer='pickle'):
440 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000441 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000442 self._address = address # XXX not final address if eg ('', 0)
443 self._authkey = AuthenticationString(authkey)
444 self._state = State()
445 self._state.value = State.INITIAL
446 self._serializer = serializer
447 self._Listener, self._Client = listener_client[serializer]
448
Benjamin Petersone711caf2008-06-11 16:44:04 +0000449 def get_server(self):
450 '''
451 Return server object with serve_forever() method and address attribute
452 '''
453 assert self._state.value == State.INITIAL
454 return Server(self._registry, self._address,
455 self._authkey, self._serializer)
456
457 def connect(self):
458 '''
459 Connect manager object to the server process
460 '''
461 Listener, Client = listener_client[self._serializer]
462 conn = Client(self._address, authkey=self._authkey)
463 dispatch(conn, None, 'dummy')
464 self._state.value = State.STARTED
465
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000466 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000467 '''
468 Spawn a server process for this manager object
469 '''
470 assert self._state.value == State.INITIAL
471
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200472 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000473 raise TypeError('initializer must be a callable')
474
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475 # pipe over which we will retrieve address of server
476 reader, writer = connection.Pipe(duplex=False)
477
478 # spawn process which runs a server
479 self._process = Process(
480 target=type(self)._run_server,
481 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000482 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000483 )
484 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000485 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000486 self._process.start()
487
488 # get address of server
489 writer.close()
490 self._address = reader.recv()
491 reader.close()
492
493 # register a finalizer
494 self._state.value = State.STARTED
495 self.shutdown = util.Finalize(
496 self, type(self)._finalize_manager,
497 args=(self._process, self._address, self._authkey,
498 self._state, self._Client),
499 exitpriority=0
500 )
501
502 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000503 def _run_server(cls, registry, address, authkey, serializer, writer,
504 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000505 '''
506 Create a server, report its address and run it
507 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000508 if initializer is not None:
509 initializer(*initargs)
510
Benjamin Petersone711caf2008-06-11 16:44:04 +0000511 # create server
512 server = cls._Server(registry, address, authkey, serializer)
513
514 # inform parent process of the server's address
515 writer.send(server.address)
516 writer.close()
517
518 # run the manager
519 util.info('manager serving at %r', server.address)
520 server.serve_forever()
521
522 def _create(self, typeid, *args, **kwds):
523 '''
524 Create a new shared object; return the token and exposed tuple
525 '''
526 assert self._state.value == State.STARTED, 'server not yet started'
527 conn = self._Client(self._address, authkey=self._authkey)
528 try:
529 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
530 finally:
531 conn.close()
532 return Token(typeid, self._address, id), exposed
533
534 def join(self, timeout=None):
535 '''
536 Join the manager process (if it has been spawned)
537 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100538 if self._process is not None:
539 self._process.join(timeout)
540 if not self._process.is_alive():
541 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000542
543 def _debug_info(self):
544 '''
545 Return some info about the servers shared objects and connections
546 '''
547 conn = self._Client(self._address, authkey=self._authkey)
548 try:
549 return dispatch(conn, None, 'debug_info')
550 finally:
551 conn.close()
552
553 def _number_of_objects(self):
554 '''
555 Return the number of shared objects
556 '''
557 conn = self._Client(self._address, authkey=self._authkey)
558 try:
559 return dispatch(conn, None, 'number_of_objects')
560 finally:
561 conn.close()
562
563 def __enter__(self):
564 return self
565
566 def __exit__(self, exc_type, exc_val, exc_tb):
567 self.shutdown()
568
569 @staticmethod
570 def _finalize_manager(process, address, authkey, state, _Client):
571 '''
572 Shutdown the manager process; will be registered as a finalizer
573 '''
574 if process.is_alive():
575 util.info('sending shutdown message to manager')
576 try:
577 conn = _Client(address, authkey=authkey)
578 try:
579 dispatch(conn, None, 'shutdown')
580 finally:
581 conn.close()
582 except Exception:
583 pass
584
585 process.join(timeout=0.2)
586 if process.is_alive():
587 util.info('manager still alive')
588 if hasattr(process, 'terminate'):
589 util.info('trying to `terminate()` manager process')
590 process.terminate()
591 process.join(timeout=0.1)
592 if process.is_alive():
593 util.info('manager still alive after terminate')
594
595 state.value = State.SHUTDOWN
596 try:
597 del BaseProxy._address_to_local[address]
598 except KeyError:
599 pass
600
601 address = property(lambda self: self._address)
602
603 @classmethod
604 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
605 method_to_typeid=None, create_method=True):
606 '''
607 Register a typeid with the manager type
608 '''
609 if '_registry' not in cls.__dict__:
610 cls._registry = cls._registry.copy()
611
612 if proxytype is None:
613 proxytype = AutoProxy
614
615 exposed = exposed or getattr(proxytype, '_exposed_', None)
616
617 method_to_typeid = method_to_typeid or \
618 getattr(proxytype, '_method_to_typeid_', None)
619
620 if method_to_typeid:
621 for key, value in list(method_to_typeid.items()):
622 assert type(key) is str, '%r is not a string' % key
623 assert type(value) is str, '%r is not a string' % value
624
625 cls._registry[typeid] = (
626 callable, exposed, method_to_typeid, proxytype
627 )
628
629 if create_method:
630 def temp(self, *args, **kwds):
631 util.debug('requesting creation of a shared %r object', typeid)
632 token, exp = self._create(typeid, *args, **kwds)
633 proxy = proxytype(
634 token, self._serializer, manager=self,
635 authkey=self._authkey, exposed=exp
636 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000637 conn = self._Client(token.address, authkey=self._authkey)
638 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000639 return proxy
640 temp.__name__ = typeid
641 setattr(cls, typeid, temp)
642
643#
644# Subclass of set which get cleared after a fork
645#
646
647class ProcessLocalSet(set):
648 def __init__(self):
649 util.register_after_fork(self, lambda obj: obj.clear())
650 def __reduce__(self):
651 return type(self), ()
652
653#
654# Definition of BaseProxy
655#
656
657class BaseProxy(object):
658 '''
659 A base for proxies of shared objects
660 '''
661 _address_to_local = {}
662 _mutex = util.ForkAwareThreadLock()
663
664 def __init__(self, token, serializer, manager=None,
665 authkey=None, exposed=None, incref=True):
666 BaseProxy._mutex.acquire()
667 try:
668 tls_idset = BaseProxy._address_to_local.get(token.address, None)
669 if tls_idset is None:
670 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
671 BaseProxy._address_to_local[token.address] = tls_idset
672 finally:
673 BaseProxy._mutex.release()
674
675 # self._tls is used to record the connection used by this
676 # thread to communicate with the manager at token.address
677 self._tls = tls_idset[0]
678
679 # self._idset is used to record the identities of all shared
680 # objects for which the current process owns references and
681 # which are in the manager at token.address
682 self._idset = tls_idset[1]
683
684 self._token = token
685 self._id = self._token.id
686 self._manager = manager
687 self._serializer = serializer
688 self._Client = listener_client[serializer][1]
689
690 if authkey is not None:
691 self._authkey = AuthenticationString(authkey)
692 elif self._manager is not None:
693 self._authkey = self._manager._authkey
694 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000695 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000696
697 if incref:
698 self._incref()
699
700 util.register_after_fork(self, BaseProxy._after_fork)
701
702 def _connect(self):
703 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000704 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000705 if threading.current_thread().name != 'MainThread':
706 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000707 conn = self._Client(self._token.address, authkey=self._authkey)
708 dispatch(conn, None, 'accept_connection', (name,))
709 self._tls.connection = conn
710
711 def _callmethod(self, methodname, args=(), kwds={}):
712 '''
713 Try to call a method of the referrent and return a copy of the result
714 '''
715 try:
716 conn = self._tls.connection
717 except AttributeError:
718 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000719 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000720 self._connect()
721 conn = self._tls.connection
722
723 conn.send((self._id, methodname, args, kwds))
724 kind, result = conn.recv()
725
726 if kind == '#RETURN':
727 return result
728 elif kind == '#PROXY':
729 exposed, token = result
730 proxytype = self._manager._registry[token.typeid][-1]
Jesse Noller824f4f32008-09-02 19:12:20 +0000731 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000732 token, self._serializer, manager=self._manager,
733 authkey=self._authkey, exposed=exposed
734 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000735 conn = self._Client(token.address, authkey=self._authkey)
736 dispatch(conn, None, 'decref', (token.id,))
737 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000738 raise convert_to_error(kind, result)
739
740 def _getvalue(self):
741 '''
742 Get a copy of the value of the referent
743 '''
744 return self._callmethod('#GETVALUE')
745
746 def _incref(self):
747 conn = self._Client(self._token.address, authkey=self._authkey)
748 dispatch(conn, None, 'incref', (self._id,))
749 util.debug('INCREF %r', self._token.id)
750
751 self._idset.add(self._id)
752
753 state = self._manager and self._manager._state
754
755 self._close = util.Finalize(
756 self, BaseProxy._decref,
757 args=(self._token, self._authkey, state,
758 self._tls, self._idset, self._Client),
759 exitpriority=10
760 )
761
762 @staticmethod
763 def _decref(token, authkey, state, tls, idset, _Client):
764 idset.discard(token.id)
765
766 # check whether manager is still alive
767 if state is None or state.value == State.STARTED:
768 # tell manager this process no longer cares about referent
769 try:
770 util.debug('DECREF %r', token.id)
771 conn = _Client(token.address, authkey=authkey)
772 dispatch(conn, None, 'decref', (token.id,))
773 except Exception as e:
774 util.debug('... decref failed %s', e)
775
776 else:
777 util.debug('DECREF %r -- manager already shutdown', token.id)
778
779 # check whether we can close this thread's connection because
780 # the process owns no more references to objects for this manager
781 if not idset and hasattr(tls, 'connection'):
782 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000783 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000784 tls.connection.close()
785 del tls.connection
786
787 def _after_fork(self):
788 self._manager = None
789 try:
790 self._incref()
791 except Exception as e:
792 # the proxy may just be for a manager which has shutdown
793 util.info('incref failed: %s' % e)
794
795 def __reduce__(self):
796 kwds = {}
797 if Popen.thread_is_spawning():
798 kwds['authkey'] = self._authkey
799
800 if getattr(self, '_isauto', False):
801 kwds['exposed'] = self._exposed_
802 return (RebuildProxy,
803 (AutoProxy, self._token, self._serializer, kwds))
804 else:
805 return (RebuildProxy,
806 (type(self), self._token, self._serializer, kwds))
807
808 def __deepcopy__(self, memo):
809 return self._getvalue()
810
811 def __repr__(self):
812 return '<%s object, typeid %r at %s>' % \
813 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
814
815 def __str__(self):
816 '''
817 Return representation of the referent (or a fall-back if that fails)
818 '''
819 try:
820 return self._callmethod('__repr__')
821 except Exception:
822 return repr(self)[:-1] + "; '__str__()' failed>"
823
824#
825# Function used for unpickling
826#
827
828def RebuildProxy(func, token, serializer, kwds):
829 '''
830 Function used for unpickling proxy objects.
831
832 If possible the shared object is returned, or otherwise a proxy for it.
833 '''
834 server = getattr(current_process(), '_manager_server', None)
835
836 if server and server.address == token.address:
837 return server.id_to_obj[token.id][0]
838 else:
839 incref = (
840 kwds.pop('incref', True) and
841 not getattr(current_process(), '_inheriting', False)
842 )
843 return func(token, serializer, incref=incref, **kwds)
844
845#
846# Functions to create proxies and proxy types
847#
848
849def MakeProxyType(name, exposed, _cache={}):
850 '''
851 Return an proxy type whose methods are given by `exposed`
852 '''
853 exposed = tuple(exposed)
854 try:
855 return _cache[(name, exposed)]
856 except KeyError:
857 pass
858
859 dic = {}
860
861 for meth in exposed:
862 exec('''def %s(self, *args, **kwds):
863 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
864
865 ProxyType = type(name, (BaseProxy,), dic)
866 ProxyType._exposed_ = exposed
867 _cache[(name, exposed)] = ProxyType
868 return ProxyType
869
870
871def AutoProxy(token, serializer, manager=None, authkey=None,
872 exposed=None, incref=True):
873 '''
874 Return an auto-proxy for `token`
875 '''
876 _Client = listener_client[serializer][1]
877
878 if exposed is None:
879 conn = _Client(token.address, authkey=authkey)
880 try:
881 exposed = dispatch(conn, None, 'get_methods', (token,))
882 finally:
883 conn.close()
884
885 if authkey is None and manager is not None:
886 authkey = manager._authkey
887 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000888 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000889
890 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
891 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
892 incref=incref)
893 proxy._isauto = True
894 return proxy
895
896#
897# Types/callables which we will register with SyncManager
898#
899
900class Namespace(object):
901 def __init__(self, **kwds):
902 self.__dict__.update(kwds)
903 def __repr__(self):
904 items = list(self.__dict__.items())
905 temp = []
906 for name, value in items:
907 if not name.startswith('_'):
908 temp.append('%s=%r' % (name, value))
909 temp.sort()
910 return 'Namespace(%s)' % str.join(', ', temp)
911
912class Value(object):
913 def __init__(self, typecode, value, lock=True):
914 self._typecode = typecode
915 self._value = value
916 def get(self):
917 return self._value
918 def set(self, value):
919 self._value = value
920 def __repr__(self):
921 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
922 value = property(get, set)
923
924def Array(typecode, sequence, lock=True):
925 return array.array(typecode, sequence)
926
927#
928# Proxy types used by SyncManager
929#
930
931class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000932 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000933 def __iter__(self):
934 return self
935 def __next__(self, *args):
936 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000937 def send(self, *args):
938 return self._callmethod('send', args)
939 def throw(self, *args):
940 return self._callmethod('throw', args)
941 def close(self, *args):
942 return self._callmethod('close', args)
943
944
945class AcquirerProxy(BaseProxy):
946 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100947 def acquire(self, blocking=True, timeout=None):
948 args = (blocking,) if timeout is None else (blocking, timeout)
949 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000950 def release(self):
951 return self._callmethod('release')
952 def __enter__(self):
953 return self._callmethod('acquire')
954 def __exit__(self, exc_type, exc_val, exc_tb):
955 return self._callmethod('release')
956
957
958class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000959 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000960 def wait(self, timeout=None):
961 return self._callmethod('wait', (timeout,))
962 def notify(self):
963 return self._callmethod('notify')
964 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000965 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200966 def wait_for(self, predicate, timeout=None):
967 result = predicate()
968 if result:
969 return result
970 if timeout is not None:
971 endtime = _time() + timeout
972 else:
973 endtime = None
974 waittime = None
975 while not result:
976 if endtime is not None:
977 waittime = endtime - _time()
978 if waittime <= 0:
979 break
980 self.wait(waittime)
981 result = predicate()
982 return result
983
Benjamin Petersone711caf2008-06-11 16:44:04 +0000984
985class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000986 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000987 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000988 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000989 def set(self):
990 return self._callmethod('set')
991 def clear(self):
992 return self._callmethod('clear')
993 def wait(self, timeout=None):
994 return self._callmethod('wait', (timeout,))
995
Richard Oudkerk3730a172012-06-15 18:26:07 +0100996
997class BarrierProxy(BaseProxy):
998 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
999 def wait(self, timeout=None):
1000 return self._callmethod('wait', (timeout,))
1001 def abort(self):
1002 return self._callmethod('abort')
1003 def reset(self):
1004 return self._callmethod('reset')
1005 @property
1006 def parties(self):
1007 return self._callmethod('__getattribute__', ('parties',))
1008 @property
1009 def n_waiting(self):
1010 return self._callmethod('__getattribute__', ('n_waiting',))
1011 @property
1012 def broken(self):
1013 return self._callmethod('__getattribute__', ('broken',))
1014
1015
Benjamin Petersone711caf2008-06-11 16:44:04 +00001016class NamespaceProxy(BaseProxy):
1017 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1018 def __getattr__(self, key):
1019 if key[0] == '_':
1020 return object.__getattribute__(self, key)
1021 callmethod = object.__getattribute__(self, '_callmethod')
1022 return callmethod('__getattribute__', (key,))
1023 def __setattr__(self, key, value):
1024 if key[0] == '_':
1025 return object.__setattr__(self, key, value)
1026 callmethod = object.__getattribute__(self, '_callmethod')
1027 return callmethod('__setattr__', (key, value))
1028 def __delattr__(self, key):
1029 if key[0] == '_':
1030 return object.__delattr__(self, key)
1031 callmethod = object.__getattribute__(self, '_callmethod')
1032 return callmethod('__delattr__', (key,))
1033
1034
1035class ValueProxy(BaseProxy):
1036 _exposed_ = ('get', 'set')
1037 def get(self):
1038 return self._callmethod('get')
1039 def set(self, value):
1040 return self._callmethod('set', (value,))
1041 value = property(get, set)
1042
1043
1044BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001045 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1046 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001047 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1048 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001049 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050class ListProxy(BaseListProxy):
1051 def __iadd__(self, value):
1052 self._callmethod('extend', (value,))
1053 return self
1054 def __imul__(self, value):
1055 self._callmethod('__imul__', (value,))
1056 return self
1057
1058
1059DictProxy = MakeProxyType('DictProxy', (
1060 '__contains__', '__delitem__', '__getitem__', '__len__',
1061 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1062 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1063 ))
1064
1065
1066ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001067 '__len__', '__getitem__', '__setitem__'
1068 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001069
1070
1071PoolProxy = MakeProxyType('PoolProxy', (
1072 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Antoine Pitroude911b22011-12-21 11:03:24 +01001073 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
Benjamin Petersone711caf2008-06-11 16:44:04 +00001074 ))
1075PoolProxy._method_to_typeid_ = {
1076 'apply_async': 'AsyncResult',
1077 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001078 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001079 'imap': 'Iterator',
1080 'imap_unordered': 'Iterator'
1081 }
1082
1083#
1084# Definition of SyncManager
1085#
1086
1087class SyncManager(BaseManager):
1088 '''
1089 Subclass of `BaseManager` which supports a number of shared object types.
1090
1091 The types registered are those intended for the synchronization
1092 of threads, plus `dict`, `list` and `Namespace`.
1093
1094 The `multiprocessing.Manager()` function creates started instances of
1095 this class.
1096 '''
1097
1098SyncManager.register('Queue', queue.Queue)
1099SyncManager.register('JoinableQueue', queue.Queue)
1100SyncManager.register('Event', threading.Event, EventProxy)
1101SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1102SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1103SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1104SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1105 AcquirerProxy)
1106SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001107SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001108SyncManager.register('Pool', Pool, PoolProxy)
1109SyncManager.register('list', list, ListProxy)
1110SyncManager.register('dict', dict, DictProxy)
1111SyncManager.register('Value', Value, ValueProxy)
1112SyncManager.register('Array', Array, ArrayProxy)
1113SyncManager.register('Namespace', Namespace, NamespaceProxy)
1114
1115# types returned by methods of PoolProxy
1116SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1117SyncManager.register('AsyncResult', create_method=False)