blob: 96056b04eaae4c094add43aa8b27a18823f1fb3f [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
22from traceback import format_exc
23from multiprocessing import Process, current_process, active_children, Pool, util, connection
24from multiprocessing.process import AuthenticationString
Richard Oudkerk73d9a292012-06-14 15:30:10 +010025from multiprocessing.forking import Popen, ForkingPickler
Charles-François Natalic8ce7152012-04-17 18:45:57 +020026from time import time as _time
Benjamin Petersone711caf2008-06-11 16:44:04 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028#
Benjamin Petersone711caf2008-06-11 16:44:04 +000029# Register some things for pickling
30#
31
32def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000033 return array.array, (a.typecode, a.tobytes())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000034ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000035
36view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000037if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000038 def rebuild_as_list(obj):
39 return list, (list(obj),)
40 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000041 ForkingPickler.register(view_type, rebuild_as_list)
Amaury Forgeot d'Arcd757e732008-08-20 08:58:40 +000042 import copyreg
43 copyreg.pickle(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000044
45#
46# Type for identifying shared objects
47#
48
49class Token(object):
50 '''
51 Type to uniquely indentify a shared object
52 '''
53 __slots__ = ('typeid', 'address', 'id')
54
55 def __init__(self, typeid, address, id):
56 (self.typeid, self.address, self.id) = (typeid, address, id)
57
58 def __getstate__(self):
59 return (self.typeid, self.address, self.id)
60
61 def __setstate__(self, state):
62 (self.typeid, self.address, self.id) = state
63
64 def __repr__(self):
65 return 'Token(typeid=%r, address=%r, id=%r)' % \
66 (self.typeid, self.address, self.id)
67
68#
69# Function for communication with a manager's server process
70#
71
72def dispatch(c, id, methodname, args=(), kwds={}):
73 '''
74 Send a message to manager using connection `c` and return response
75 '''
76 c.send((id, methodname, args, kwds))
77 kind, result = c.recv()
78 if kind == '#RETURN':
79 return result
80 raise convert_to_error(kind, result)
81
82def convert_to_error(kind, result):
83 if kind == '#ERROR':
84 return result
85 elif kind == '#TRACEBACK':
86 assert type(result) is str
87 return RemoteError(result)
88 elif kind == '#UNSERIALIZABLE':
89 assert type(result) is str
90 return RemoteError('Unserializable message: %s\n' % result)
91 else:
92 return ValueError('Unrecognized message type')
93
94class RemoteError(Exception):
95 def __str__(self):
96 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
97
98#
99# Functions for finding the method names of an object
100#
101
102def all_methods(obj):
103 '''
104 Return a list of names of methods of `obj`
105 '''
106 temp = []
107 for name in dir(obj):
108 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200109 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000110 temp.append(name)
111 return temp
112
113def public_methods(obj):
114 '''
115 Return a list of names of methods of `obj` which do not start with '_'
116 '''
117 return [name for name in all_methods(obj) if name[0] != '_']
118
119#
120# Server which is run in a process controlled by a manager
121#
122
123class Server(object):
124 '''
125 Server class which runs in a process controlled by a manager object
126 '''
127 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
128 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
129
130 def __init__(self, registry, address, authkey, serializer):
131 assert isinstance(authkey, bytes)
132 self.registry = registry
133 self.authkey = AuthenticationString(authkey)
134 Listener, Client = listener_client[serializer]
135
136 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100137 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000138 self.address = self.listener.address
139
Jesse Noller63b3a972009-01-21 02:15:48 +0000140 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000141 self.id_to_refcount = {}
142 self.mutex = threading.RLock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143
144 def serve_forever(self):
145 '''
146 Run the server forever
147 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100148 self.stop_event = threading.Event()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 current_process()._manager_server = self
150 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100151 accepter = threading.Thread(target=self.accepter)
152 accepter.daemon = True
153 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000154 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100155 while not self.stop_event.is_set():
156 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000157 except (KeyboardInterrupt, SystemExit):
158 pass
159 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100160 if sys.stdout != sys.__stdout__:
161 util.debug('resetting stdout, stderr')
162 sys.stdout = sys.__stdout__
163 sys.stderr = sys.__stderr__
164 sys.exit(0)
165
166 def accepter(self):
167 while True:
168 try:
169 c = self.listener.accept()
170 except (OSError, IOError):
171 continue
172 t = threading.Thread(target=self.handle_request, args=(c,))
173 t.daemon = True
174 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175
176 def handle_request(self, c):
177 '''
178 Handle a new connection
179 '''
180 funcname = result = request = None
181 try:
182 connection.deliver_challenge(c, self.authkey)
183 connection.answer_challenge(c, self.authkey)
184 request = c.recv()
185 ignore, funcname, args, kwds = request
186 assert funcname in self.public, '%r unrecognized' % funcname
187 func = getattr(self, funcname)
188 except Exception:
189 msg = ('#TRACEBACK', format_exc())
190 else:
191 try:
192 result = func(c, *args, **kwds)
193 except Exception:
194 msg = ('#TRACEBACK', format_exc())
195 else:
196 msg = ('#RETURN', result)
197 try:
198 c.send(msg)
199 except Exception as e:
200 try:
201 c.send(('#TRACEBACK', format_exc()))
202 except Exception:
203 pass
204 util.info('Failure to send message: %r', msg)
205 util.info(' ... request was %r', request)
206 util.info(' ... exception was %r', e)
207
208 c.close()
209
210 def serve_client(self, conn):
211 '''
212 Handle requests from the proxies in a particular process/thread
213 '''
214 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000215 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
217 recv = conn.recv
218 send = conn.send
219 id_to_obj = self.id_to_obj
220
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100221 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222
223 try:
224 methodname = obj = None
225 request = recv()
226 ident, methodname, args, kwds = request
227 obj, exposed, gettypeid = id_to_obj[ident]
228
229 if methodname not in exposed:
230 raise AttributeError(
231 'method %r of %r object is not in exposed=%r' %
232 (methodname, type(obj), exposed)
233 )
234
235 function = getattr(obj, methodname)
236
237 try:
238 res = function(*args, **kwds)
239 except Exception as e:
240 msg = ('#ERROR', e)
241 else:
242 typeid = gettypeid and gettypeid.get(methodname, None)
243 if typeid:
244 rident, rexposed = self.create(conn, typeid, res)
245 token = Token(typeid, self.address, rident)
246 msg = ('#PROXY', (rexposed, token))
247 else:
248 msg = ('#RETURN', res)
249
250 except AttributeError:
251 if methodname is None:
252 msg = ('#TRACEBACK', format_exc())
253 else:
254 try:
255 fallback_func = self.fallback_mapping[methodname]
256 result = fallback_func(
257 self, conn, ident, obj, *args, **kwds
258 )
259 msg = ('#RETURN', result)
260 except Exception:
261 msg = ('#TRACEBACK', format_exc())
262
263 except EOFError:
264 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000265 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000266 sys.exit(0)
267
268 except Exception:
269 msg = ('#TRACEBACK', format_exc())
270
271 try:
272 try:
273 send(msg)
274 except Exception as e:
275 send(('#UNSERIALIZABLE', repr(msg)))
276 except Exception as e:
277 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000278 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279 util.info(' ... message was %r', msg)
280 util.info(' ... exception was %r', e)
281 conn.close()
282 sys.exit(1)
283
284 def fallback_getvalue(self, conn, ident, obj):
285 return obj
286
287 def fallback_str(self, conn, ident, obj):
288 return str(obj)
289
290 def fallback_repr(self, conn, ident, obj):
291 return repr(obj)
292
293 fallback_mapping = {
294 '__str__':fallback_str,
295 '__repr__':fallback_repr,
296 '#GETVALUE':fallback_getvalue
297 }
298
299 def dummy(self, c):
300 pass
301
302 def debug_info(self, c):
303 '''
304 Return some info --- useful to spot problems with refcounting
305 '''
306 self.mutex.acquire()
307 try:
308 result = []
309 keys = list(self.id_to_obj.keys())
310 keys.sort()
311 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000312 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000313 result.append(' %s: refcount=%s\n %s' %
314 (ident, self.id_to_refcount[ident],
315 str(self.id_to_obj[ident][0])[:75]))
316 return '\n'.join(result)
317 finally:
318 self.mutex.release()
319
320 def number_of_objects(self, c):
321 '''
322 Number of shared objects
323 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000324 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 def shutdown(self, c):
327 '''
328 Shutdown this process
329 '''
330 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100331 util.debug('manager received shutdown message')
332 c.send(('#RETURN', None))
333 except:
334 import traceback
335 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100337 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000338
339 def create(self, c, typeid, *args, **kwds):
340 '''
341 Create a new shared object and return its id
342 '''
343 self.mutex.acquire()
344 try:
345 callable, exposed, method_to_typeid, proxytype = \
346 self.registry[typeid]
347
348 if callable is None:
349 assert len(args) == 1 and not kwds
350 obj = args[0]
351 else:
352 obj = callable(*args, **kwds)
353
354 if exposed is None:
355 exposed = public_methods(obj)
356 if method_to_typeid is not None:
357 assert type(method_to_typeid) is dict
358 exposed = list(exposed) + list(method_to_typeid)
359
360 ident = '%x' % id(obj) # convert to string because xmlrpclib
361 # only has 32 bit signed integers
362 util.debug('%r callable returned object with id %r', typeid, ident)
363
364 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
365 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000366 self.id_to_refcount[ident] = 0
367 # increment the reference count immediately, to avoid
368 # this object being garbage collected before a Proxy
369 # object for it can be created. The caller of create()
370 # is responsible for doing a decref once the Proxy object
371 # has been created.
372 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000373 return ident, tuple(exposed)
374 finally:
375 self.mutex.release()
376
377 def get_methods(self, c, token):
378 '''
379 Return the methods of the shared object indicated by token
380 '''
381 return tuple(self.id_to_obj[token.id][1])
382
383 def accept_connection(self, c, name):
384 '''
385 Spawn a new thread to serve this connection
386 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000387 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000388 c.send(('#RETURN', None))
389 self.serve_client(c)
390
391 def incref(self, c, ident):
392 self.mutex.acquire()
393 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000394 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000395 finally:
396 self.mutex.release()
397
398 def decref(self, c, ident):
399 self.mutex.acquire()
400 try:
401 assert self.id_to_refcount[ident] >= 1
402 self.id_to_refcount[ident] -= 1
403 if self.id_to_refcount[ident] == 0:
404 del self.id_to_obj[ident], self.id_to_refcount[ident]
Benjamin Peterson4ac9ce42009-10-04 14:49:41 +0000405 util.debug('disposing of obj with id %r', ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000406 finally:
407 self.mutex.release()
408
409#
410# Class to represent state of a manager
411#
412
413class State(object):
414 __slots__ = ['value']
415 INITIAL = 0
416 STARTED = 1
417 SHUTDOWN = 2
418
419#
420# Mapping from serializer name to Listener and Client types
421#
422
423listener_client = {
424 'pickle' : (connection.Listener, connection.Client),
425 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
426 }
427
428#
429# Definition of BaseManager
430#
431
432class BaseManager(object):
433 '''
434 Base class for managers
435 '''
436 _registry = {}
437 _Server = Server
438
439 def __init__(self, address=None, authkey=None, serializer='pickle'):
440 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000441 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000442 self._address = address # XXX not final address if eg ('', 0)
443 self._authkey = AuthenticationString(authkey)
444 self._state = State()
445 self._state.value = State.INITIAL
446 self._serializer = serializer
447 self._Listener, self._Client = listener_client[serializer]
448
Benjamin Petersone711caf2008-06-11 16:44:04 +0000449 def get_server(self):
450 '''
451 Return server object with serve_forever() method and address attribute
452 '''
453 assert self._state.value == State.INITIAL
454 return Server(self._registry, self._address,
455 self._authkey, self._serializer)
456
457 def connect(self):
458 '''
459 Connect manager object to the server process
460 '''
461 Listener, Client = listener_client[self._serializer]
462 conn = Client(self._address, authkey=self._authkey)
463 dispatch(conn, None, 'dummy')
464 self._state.value = State.STARTED
465
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000466 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000467 '''
468 Spawn a server process for this manager object
469 '''
470 assert self._state.value == State.INITIAL
471
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200472 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000473 raise TypeError('initializer must be a callable')
474
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475 # pipe over which we will retrieve address of server
476 reader, writer = connection.Pipe(duplex=False)
477
478 # spawn process which runs a server
479 self._process = Process(
480 target=type(self)._run_server,
481 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000482 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000483 )
484 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000485 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000486 self._process.start()
487
488 # get address of server
489 writer.close()
490 self._address = reader.recv()
491 reader.close()
492
493 # register a finalizer
494 self._state.value = State.STARTED
495 self.shutdown = util.Finalize(
496 self, type(self)._finalize_manager,
497 args=(self._process, self._address, self._authkey,
498 self._state, self._Client),
499 exitpriority=0
500 )
501
502 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000503 def _run_server(cls, registry, address, authkey, serializer, writer,
504 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000505 '''
506 Create a server, report its address and run it
507 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000508 if initializer is not None:
509 initializer(*initargs)
510
Benjamin Petersone711caf2008-06-11 16:44:04 +0000511 # create server
512 server = cls._Server(registry, address, authkey, serializer)
513
514 # inform parent process of the server's address
515 writer.send(server.address)
516 writer.close()
517
518 # run the manager
519 util.info('manager serving at %r', server.address)
520 server.serve_forever()
521
522 def _create(self, typeid, *args, **kwds):
523 '''
524 Create a new shared object; return the token and exposed tuple
525 '''
526 assert self._state.value == State.STARTED, 'server not yet started'
527 conn = self._Client(self._address, authkey=self._authkey)
528 try:
529 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
530 finally:
531 conn.close()
532 return Token(typeid, self._address, id), exposed
533
534 def join(self, timeout=None):
535 '''
536 Join the manager process (if it has been spawned)
537 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100538 if self._process is not None:
539 self._process.join(timeout)
540 if not self._process.is_alive():
541 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000542
543 def _debug_info(self):
544 '''
545 Return some info about the servers shared objects and connections
546 '''
547 conn = self._Client(self._address, authkey=self._authkey)
548 try:
549 return dispatch(conn, None, 'debug_info')
550 finally:
551 conn.close()
552
553 def _number_of_objects(self):
554 '''
555 Return the number of shared objects
556 '''
557 conn = self._Client(self._address, authkey=self._authkey)
558 try:
559 return dispatch(conn, None, 'number_of_objects')
560 finally:
561 conn.close()
562
563 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100564 if self._state.value == State.INITIAL:
565 self.start()
566 assert self._state.value == State.STARTED
Benjamin Petersone711caf2008-06-11 16:44:04 +0000567 return self
568
569 def __exit__(self, exc_type, exc_val, exc_tb):
570 self.shutdown()
571
572 @staticmethod
573 def _finalize_manager(process, address, authkey, state, _Client):
574 '''
575 Shutdown the manager process; will be registered as a finalizer
576 '''
577 if process.is_alive():
578 util.info('sending shutdown message to manager')
579 try:
580 conn = _Client(address, authkey=authkey)
581 try:
582 dispatch(conn, None, 'shutdown')
583 finally:
584 conn.close()
585 except Exception:
586 pass
587
Richard Oudkerk3049f122012-06-15 20:08:29 +0100588 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000589 if process.is_alive():
590 util.info('manager still alive')
591 if hasattr(process, 'terminate'):
592 util.info('trying to `terminate()` manager process')
593 process.terminate()
594 process.join(timeout=0.1)
595 if process.is_alive():
596 util.info('manager still alive after terminate')
597
598 state.value = State.SHUTDOWN
599 try:
600 del BaseProxy._address_to_local[address]
601 except KeyError:
602 pass
603
604 address = property(lambda self: self._address)
605
606 @classmethod
607 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
608 method_to_typeid=None, create_method=True):
609 '''
610 Register a typeid with the manager type
611 '''
612 if '_registry' not in cls.__dict__:
613 cls._registry = cls._registry.copy()
614
615 if proxytype is None:
616 proxytype = AutoProxy
617
618 exposed = exposed or getattr(proxytype, '_exposed_', None)
619
620 method_to_typeid = method_to_typeid or \
621 getattr(proxytype, '_method_to_typeid_', None)
622
623 if method_to_typeid:
624 for key, value in list(method_to_typeid.items()):
625 assert type(key) is str, '%r is not a string' % key
626 assert type(value) is str, '%r is not a string' % value
627
628 cls._registry[typeid] = (
629 callable, exposed, method_to_typeid, proxytype
630 )
631
632 if create_method:
633 def temp(self, *args, **kwds):
634 util.debug('requesting creation of a shared %r object', typeid)
635 token, exp = self._create(typeid, *args, **kwds)
636 proxy = proxytype(
637 token, self._serializer, manager=self,
638 authkey=self._authkey, exposed=exp
639 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000640 conn = self._Client(token.address, authkey=self._authkey)
641 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000642 return proxy
643 temp.__name__ = typeid
644 setattr(cls, typeid, temp)
645
646#
647# Subclass of set which get cleared after a fork
648#
649
650class ProcessLocalSet(set):
651 def __init__(self):
652 util.register_after_fork(self, lambda obj: obj.clear())
653 def __reduce__(self):
654 return type(self), ()
655
656#
657# Definition of BaseProxy
658#
659
660class BaseProxy(object):
661 '''
662 A base for proxies of shared objects
663 '''
664 _address_to_local = {}
665 _mutex = util.ForkAwareThreadLock()
666
667 def __init__(self, token, serializer, manager=None,
668 authkey=None, exposed=None, incref=True):
669 BaseProxy._mutex.acquire()
670 try:
671 tls_idset = BaseProxy._address_to_local.get(token.address, None)
672 if tls_idset is None:
673 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
674 BaseProxy._address_to_local[token.address] = tls_idset
675 finally:
676 BaseProxy._mutex.release()
677
678 # self._tls is used to record the connection used by this
679 # thread to communicate with the manager at token.address
680 self._tls = tls_idset[0]
681
682 # self._idset is used to record the identities of all shared
683 # objects for which the current process owns references and
684 # which are in the manager at token.address
685 self._idset = tls_idset[1]
686
687 self._token = token
688 self._id = self._token.id
689 self._manager = manager
690 self._serializer = serializer
691 self._Client = listener_client[serializer][1]
692
693 if authkey is not None:
694 self._authkey = AuthenticationString(authkey)
695 elif self._manager is not None:
696 self._authkey = self._manager._authkey
697 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000698 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000699
700 if incref:
701 self._incref()
702
703 util.register_after_fork(self, BaseProxy._after_fork)
704
705 def _connect(self):
706 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000707 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000708 if threading.current_thread().name != 'MainThread':
709 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000710 conn = self._Client(self._token.address, authkey=self._authkey)
711 dispatch(conn, None, 'accept_connection', (name,))
712 self._tls.connection = conn
713
714 def _callmethod(self, methodname, args=(), kwds={}):
715 '''
716 Try to call a method of the referrent and return a copy of the result
717 '''
718 try:
719 conn = self._tls.connection
720 except AttributeError:
721 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000722 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000723 self._connect()
724 conn = self._tls.connection
725
726 conn.send((self._id, methodname, args, kwds))
727 kind, result = conn.recv()
728
729 if kind == '#RETURN':
730 return result
731 elif kind == '#PROXY':
732 exposed, token = result
733 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100734 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000735 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000736 token, self._serializer, manager=self._manager,
737 authkey=self._authkey, exposed=exposed
738 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000739 conn = self._Client(token.address, authkey=self._authkey)
740 dispatch(conn, None, 'decref', (token.id,))
741 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000742 raise convert_to_error(kind, result)
743
744 def _getvalue(self):
745 '''
746 Get a copy of the value of the referent
747 '''
748 return self._callmethod('#GETVALUE')
749
750 def _incref(self):
751 conn = self._Client(self._token.address, authkey=self._authkey)
752 dispatch(conn, None, 'incref', (self._id,))
753 util.debug('INCREF %r', self._token.id)
754
755 self._idset.add(self._id)
756
757 state = self._manager and self._manager._state
758
759 self._close = util.Finalize(
760 self, BaseProxy._decref,
761 args=(self._token, self._authkey, state,
762 self._tls, self._idset, self._Client),
763 exitpriority=10
764 )
765
766 @staticmethod
767 def _decref(token, authkey, state, tls, idset, _Client):
768 idset.discard(token.id)
769
770 # check whether manager is still alive
771 if state is None or state.value == State.STARTED:
772 # tell manager this process no longer cares about referent
773 try:
774 util.debug('DECREF %r', token.id)
775 conn = _Client(token.address, authkey=authkey)
776 dispatch(conn, None, 'decref', (token.id,))
777 except Exception as e:
778 util.debug('... decref failed %s', e)
779
780 else:
781 util.debug('DECREF %r -- manager already shutdown', token.id)
782
783 # check whether we can close this thread's connection because
784 # the process owns no more references to objects for this manager
785 if not idset and hasattr(tls, 'connection'):
786 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000787 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000788 tls.connection.close()
789 del tls.connection
790
791 def _after_fork(self):
792 self._manager = None
793 try:
794 self._incref()
795 except Exception as e:
796 # the proxy may just be for a manager which has shutdown
797 util.info('incref failed: %s' % e)
798
799 def __reduce__(self):
800 kwds = {}
801 if Popen.thread_is_spawning():
802 kwds['authkey'] = self._authkey
803
804 if getattr(self, '_isauto', False):
805 kwds['exposed'] = self._exposed_
806 return (RebuildProxy,
807 (AutoProxy, self._token, self._serializer, kwds))
808 else:
809 return (RebuildProxy,
810 (type(self), self._token, self._serializer, kwds))
811
812 def __deepcopy__(self, memo):
813 return self._getvalue()
814
815 def __repr__(self):
816 return '<%s object, typeid %r at %s>' % \
817 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
818
819 def __str__(self):
820 '''
821 Return representation of the referent (or a fall-back if that fails)
822 '''
823 try:
824 return self._callmethod('__repr__')
825 except Exception:
826 return repr(self)[:-1] + "; '__str__()' failed>"
827
828#
829# Function used for unpickling
830#
831
832def RebuildProxy(func, token, serializer, kwds):
833 '''
834 Function used for unpickling proxy objects.
835
836 If possible the shared object is returned, or otherwise a proxy for it.
837 '''
838 server = getattr(current_process(), '_manager_server', None)
839
840 if server and server.address == token.address:
841 return server.id_to_obj[token.id][0]
842 else:
843 incref = (
844 kwds.pop('incref', True) and
845 not getattr(current_process(), '_inheriting', False)
846 )
847 return func(token, serializer, incref=incref, **kwds)
848
849#
850# Functions to create proxies and proxy types
851#
852
853def MakeProxyType(name, exposed, _cache={}):
854 '''
855 Return an proxy type whose methods are given by `exposed`
856 '''
857 exposed = tuple(exposed)
858 try:
859 return _cache[(name, exposed)]
860 except KeyError:
861 pass
862
863 dic = {}
864
865 for meth in exposed:
866 exec('''def %s(self, *args, **kwds):
867 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
868
869 ProxyType = type(name, (BaseProxy,), dic)
870 ProxyType._exposed_ = exposed
871 _cache[(name, exposed)] = ProxyType
872 return ProxyType
873
874
875def AutoProxy(token, serializer, manager=None, authkey=None,
876 exposed=None, incref=True):
877 '''
878 Return an auto-proxy for `token`
879 '''
880 _Client = listener_client[serializer][1]
881
882 if exposed is None:
883 conn = _Client(token.address, authkey=authkey)
884 try:
885 exposed = dispatch(conn, None, 'get_methods', (token,))
886 finally:
887 conn.close()
888
889 if authkey is None and manager is not None:
890 authkey = manager._authkey
891 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000892 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000893
894 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
895 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
896 incref=incref)
897 proxy._isauto = True
898 return proxy
899
900#
901# Types/callables which we will register with SyncManager
902#
903
904class Namespace(object):
905 def __init__(self, **kwds):
906 self.__dict__.update(kwds)
907 def __repr__(self):
908 items = list(self.__dict__.items())
909 temp = []
910 for name, value in items:
911 if not name.startswith('_'):
912 temp.append('%s=%r' % (name, value))
913 temp.sort()
914 return 'Namespace(%s)' % str.join(', ', temp)
915
916class Value(object):
917 def __init__(self, typecode, value, lock=True):
918 self._typecode = typecode
919 self._value = value
920 def get(self):
921 return self._value
922 def set(self, value):
923 self._value = value
924 def __repr__(self):
925 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
926 value = property(get, set)
927
928def Array(typecode, sequence, lock=True):
929 return array.array(typecode, sequence)
930
931#
932# Proxy types used by SyncManager
933#
934
935class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000936 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000937 def __iter__(self):
938 return self
939 def __next__(self, *args):
940 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000941 def send(self, *args):
942 return self._callmethod('send', args)
943 def throw(self, *args):
944 return self._callmethod('throw', args)
945 def close(self, *args):
946 return self._callmethod('close', args)
947
948
949class AcquirerProxy(BaseProxy):
950 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100951 def acquire(self, blocking=True, timeout=None):
952 args = (blocking,) if timeout is None else (blocking, timeout)
953 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000954 def release(self):
955 return self._callmethod('release')
956 def __enter__(self):
957 return self._callmethod('acquire')
958 def __exit__(self, exc_type, exc_val, exc_tb):
959 return self._callmethod('release')
960
961
962class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000963 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964 def wait(self, timeout=None):
965 return self._callmethod('wait', (timeout,))
966 def notify(self):
967 return self._callmethod('notify')
968 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000969 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200970 def wait_for(self, predicate, timeout=None):
971 result = predicate()
972 if result:
973 return result
974 if timeout is not None:
975 endtime = _time() + timeout
976 else:
977 endtime = None
978 waittime = None
979 while not result:
980 if endtime is not None:
981 waittime = endtime - _time()
982 if waittime <= 0:
983 break
984 self.wait(waittime)
985 result = predicate()
986 return result
987
Benjamin Petersone711caf2008-06-11 16:44:04 +0000988
989class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000990 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000991 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000992 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000993 def set(self):
994 return self._callmethod('set')
995 def clear(self):
996 return self._callmethod('clear')
997 def wait(self, timeout=None):
998 return self._callmethod('wait', (timeout,))
999
Richard Oudkerk3730a172012-06-15 18:26:07 +01001000
1001class BarrierProxy(BaseProxy):
1002 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1003 def wait(self, timeout=None):
1004 return self._callmethod('wait', (timeout,))
1005 def abort(self):
1006 return self._callmethod('abort')
1007 def reset(self):
1008 return self._callmethod('reset')
1009 @property
1010 def parties(self):
1011 return self._callmethod('__getattribute__', ('parties',))
1012 @property
1013 def n_waiting(self):
1014 return self._callmethod('__getattribute__', ('n_waiting',))
1015 @property
1016 def broken(self):
1017 return self._callmethod('__getattribute__', ('broken',))
1018
1019
Benjamin Petersone711caf2008-06-11 16:44:04 +00001020class NamespaceProxy(BaseProxy):
1021 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1022 def __getattr__(self, key):
1023 if key[0] == '_':
1024 return object.__getattribute__(self, key)
1025 callmethod = object.__getattribute__(self, '_callmethod')
1026 return callmethod('__getattribute__', (key,))
1027 def __setattr__(self, key, value):
1028 if key[0] == '_':
1029 return object.__setattr__(self, key, value)
1030 callmethod = object.__getattribute__(self, '_callmethod')
1031 return callmethod('__setattr__', (key, value))
1032 def __delattr__(self, key):
1033 if key[0] == '_':
1034 return object.__delattr__(self, key)
1035 callmethod = object.__getattribute__(self, '_callmethod')
1036 return callmethod('__delattr__', (key,))
1037
1038
1039class ValueProxy(BaseProxy):
1040 _exposed_ = ('get', 'set')
1041 def get(self):
1042 return self._callmethod('get')
1043 def set(self, value):
1044 return self._callmethod('set', (value,))
1045 value = property(get, set)
1046
1047
1048BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001049 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1050 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001051 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1052 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001053 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001054class ListProxy(BaseListProxy):
1055 def __iadd__(self, value):
1056 self._callmethod('extend', (value,))
1057 return self
1058 def __imul__(self, value):
1059 self._callmethod('__imul__', (value,))
1060 return self
1061
1062
1063DictProxy = MakeProxyType('DictProxy', (
1064 '__contains__', '__delitem__', '__getitem__', '__len__',
1065 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1066 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1067 ))
1068
1069
1070ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001071 '__len__', '__getitem__', '__setitem__'
1072 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001073
1074
1075PoolProxy = MakeProxyType('PoolProxy', (
1076 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Antoine Pitroude911b22011-12-21 11:03:24 +01001077 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
Benjamin Petersone711caf2008-06-11 16:44:04 +00001078 ))
1079PoolProxy._method_to_typeid_ = {
1080 'apply_async': 'AsyncResult',
1081 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001082 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001083 'imap': 'Iterator',
1084 'imap_unordered': 'Iterator'
1085 }
1086
1087#
1088# Definition of SyncManager
1089#
1090
1091class SyncManager(BaseManager):
1092 '''
1093 Subclass of `BaseManager` which supports a number of shared object types.
1094
1095 The types registered are those intended for the synchronization
1096 of threads, plus `dict`, `list` and `Namespace`.
1097
1098 The `multiprocessing.Manager()` function creates started instances of
1099 this class.
1100 '''
1101
1102SyncManager.register('Queue', queue.Queue)
1103SyncManager.register('JoinableQueue', queue.Queue)
1104SyncManager.register('Event', threading.Event, EventProxy)
1105SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1106SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1107SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1108SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1109 AcquirerProxy)
1110SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001111SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001112SyncManager.register('Pool', Pool, PoolProxy)
1113SyncManager.register('list', list, ListProxy)
1114SyncManager.register('dict', dict, DictProxy)
1115SyncManager.register('Value', Value, ValueProxy)
1116SyncManager.register('Array', Array, ArrayProxy)
1117SyncManager.register('Namespace', Namespace, NamespaceProxy)
1118
1119# types returned by methods of PoolProxy
1120SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1121SyncManager.register('AsyncResult', create_method=False)