blob: 1ab147e29e6ca10d95cc85fbbced067a6b3bba1c [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
22from traceback import format_exc
23from multiprocessing import Process, current_process, active_children, Pool, util, connection
24from multiprocessing.process import AuthenticationString
Richard Oudkerk73d9a292012-06-14 15:30:10 +010025from multiprocessing.forking import Popen, ForkingPickler
Charles-François Natalic8ce7152012-04-17 18:45:57 +020026from time import time as _time
Benjamin Petersone711caf2008-06-11 16:44:04 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028#
Benjamin Petersone711caf2008-06-11 16:44:04 +000029# Register some things for pickling
30#
31
32def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000033 return array.array, (a.typecode, a.tobytes())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000034ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000035
36view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000037if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000038 def rebuild_as_list(obj):
39 return list, (list(obj),)
40 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000041 ForkingPickler.register(view_type, rebuild_as_list)
Amaury Forgeot d'Arcd757e732008-08-20 08:58:40 +000042 import copyreg
43 copyreg.pickle(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000044
45#
46# Type for identifying shared objects
47#
48
49class Token(object):
50 '''
51 Type to uniquely indentify a shared object
52 '''
53 __slots__ = ('typeid', 'address', 'id')
54
55 def __init__(self, typeid, address, id):
56 (self.typeid, self.address, self.id) = (typeid, address, id)
57
58 def __getstate__(self):
59 return (self.typeid, self.address, self.id)
60
61 def __setstate__(self, state):
62 (self.typeid, self.address, self.id) = state
63
64 def __repr__(self):
65 return 'Token(typeid=%r, address=%r, id=%r)' % \
66 (self.typeid, self.address, self.id)
67
68#
69# Function for communication with a manager's server process
70#
71
72def dispatch(c, id, methodname, args=(), kwds={}):
73 '''
74 Send a message to manager using connection `c` and return response
75 '''
76 c.send((id, methodname, args, kwds))
77 kind, result = c.recv()
78 if kind == '#RETURN':
79 return result
80 raise convert_to_error(kind, result)
81
82def convert_to_error(kind, result):
83 if kind == '#ERROR':
84 return result
85 elif kind == '#TRACEBACK':
86 assert type(result) is str
87 return RemoteError(result)
88 elif kind == '#UNSERIALIZABLE':
89 assert type(result) is str
90 return RemoteError('Unserializable message: %s\n' % result)
91 else:
92 return ValueError('Unrecognized message type')
93
94class RemoteError(Exception):
95 def __str__(self):
96 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
97
98#
99# Functions for finding the method names of an object
100#
101
102def all_methods(obj):
103 '''
104 Return a list of names of methods of `obj`
105 '''
106 temp = []
107 for name in dir(obj):
108 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200109 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000110 temp.append(name)
111 return temp
112
113def public_methods(obj):
114 '''
115 Return a list of names of methods of `obj` which do not start with '_'
116 '''
117 return [name for name in all_methods(obj) if name[0] != '_']
118
119#
120# Server which is run in a process controlled by a manager
121#
122
123class Server(object):
124 '''
125 Server class which runs in a process controlled by a manager object
126 '''
127 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
128 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
129
130 def __init__(self, registry, address, authkey, serializer):
131 assert isinstance(authkey, bytes)
132 self.registry = registry
133 self.authkey = AuthenticationString(authkey)
134 Listener, Client = listener_client[serializer]
135
136 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100137 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000138 self.address = self.listener.address
139
Jesse Noller63b3a972009-01-21 02:15:48 +0000140 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000141 self.id_to_refcount = {}
142 self.mutex = threading.RLock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143
144 def serve_forever(self):
145 '''
146 Run the server forever
147 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100148 self.stop_event = threading.Event()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 current_process()._manager_server = self
150 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100151 accepter = threading.Thread(target=self.accepter)
152 accepter.daemon = True
153 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000154 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100155 while not self.stop_event.is_set():
156 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000157 except (KeyboardInterrupt, SystemExit):
158 pass
159 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100160 if sys.stdout != sys.__stdout__:
161 util.debug('resetting stdout, stderr')
162 sys.stdout = sys.__stdout__
163 sys.stderr = sys.__stderr__
164 sys.exit(0)
165
166 def accepter(self):
167 while True:
168 try:
169 c = self.listener.accept()
170 except (OSError, IOError):
171 continue
172 t = threading.Thread(target=self.handle_request, args=(c,))
173 t.daemon = True
174 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175
176 def handle_request(self, c):
177 '''
178 Handle a new connection
179 '''
180 funcname = result = request = None
181 try:
182 connection.deliver_challenge(c, self.authkey)
183 connection.answer_challenge(c, self.authkey)
184 request = c.recv()
185 ignore, funcname, args, kwds = request
186 assert funcname in self.public, '%r unrecognized' % funcname
187 func = getattr(self, funcname)
188 except Exception:
189 msg = ('#TRACEBACK', format_exc())
190 else:
191 try:
192 result = func(c, *args, **kwds)
193 except Exception:
194 msg = ('#TRACEBACK', format_exc())
195 else:
196 msg = ('#RETURN', result)
197 try:
198 c.send(msg)
199 except Exception as e:
200 try:
201 c.send(('#TRACEBACK', format_exc()))
202 except Exception:
203 pass
204 util.info('Failure to send message: %r', msg)
205 util.info(' ... request was %r', request)
206 util.info(' ... exception was %r', e)
207
208 c.close()
209
210 def serve_client(self, conn):
211 '''
212 Handle requests from the proxies in a particular process/thread
213 '''
214 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000215 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
217 recv = conn.recv
218 send = conn.send
219 id_to_obj = self.id_to_obj
220
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100221 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222
223 try:
224 methodname = obj = None
225 request = recv()
226 ident, methodname, args, kwds = request
227 obj, exposed, gettypeid = id_to_obj[ident]
228
229 if methodname not in exposed:
230 raise AttributeError(
231 'method %r of %r object is not in exposed=%r' %
232 (methodname, type(obj), exposed)
233 )
234
235 function = getattr(obj, methodname)
236
237 try:
238 res = function(*args, **kwds)
239 except Exception as e:
240 msg = ('#ERROR', e)
241 else:
242 typeid = gettypeid and gettypeid.get(methodname, None)
243 if typeid:
244 rident, rexposed = self.create(conn, typeid, res)
245 token = Token(typeid, self.address, rident)
246 msg = ('#PROXY', (rexposed, token))
247 else:
248 msg = ('#RETURN', res)
249
250 except AttributeError:
251 if methodname is None:
252 msg = ('#TRACEBACK', format_exc())
253 else:
254 try:
255 fallback_func = self.fallback_mapping[methodname]
256 result = fallback_func(
257 self, conn, ident, obj, *args, **kwds
258 )
259 msg = ('#RETURN', result)
260 except Exception:
261 msg = ('#TRACEBACK', format_exc())
262
263 except EOFError:
264 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000265 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000266 sys.exit(0)
267
268 except Exception:
269 msg = ('#TRACEBACK', format_exc())
270
271 try:
272 try:
273 send(msg)
274 except Exception as e:
275 send(('#UNSERIALIZABLE', repr(msg)))
276 except Exception as e:
277 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000278 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279 util.info(' ... message was %r', msg)
280 util.info(' ... exception was %r', e)
281 conn.close()
282 sys.exit(1)
283
284 def fallback_getvalue(self, conn, ident, obj):
285 return obj
286
287 def fallback_str(self, conn, ident, obj):
288 return str(obj)
289
290 def fallback_repr(self, conn, ident, obj):
291 return repr(obj)
292
293 fallback_mapping = {
294 '__str__':fallback_str,
295 '__repr__':fallback_repr,
296 '#GETVALUE':fallback_getvalue
297 }
298
299 def dummy(self, c):
300 pass
301
302 def debug_info(self, c):
303 '''
304 Return some info --- useful to spot problems with refcounting
305 '''
306 self.mutex.acquire()
307 try:
308 result = []
309 keys = list(self.id_to_obj.keys())
310 keys.sort()
311 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000312 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000313 result.append(' %s: refcount=%s\n %s' %
314 (ident, self.id_to_refcount[ident],
315 str(self.id_to_obj[ident][0])[:75]))
316 return '\n'.join(result)
317 finally:
318 self.mutex.release()
319
320 def number_of_objects(self, c):
321 '''
322 Number of shared objects
323 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000324 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 def shutdown(self, c):
327 '''
328 Shutdown this process
329 '''
330 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100331 util.debug('manager received shutdown message')
332 c.send(('#RETURN', None))
333 except:
334 import traceback
335 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100337 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000338
339 def create(self, c, typeid, *args, **kwds):
340 '''
341 Create a new shared object and return its id
342 '''
343 self.mutex.acquire()
344 try:
345 callable, exposed, method_to_typeid, proxytype = \
346 self.registry[typeid]
347
348 if callable is None:
349 assert len(args) == 1 and not kwds
350 obj = args[0]
351 else:
352 obj = callable(*args, **kwds)
353
354 if exposed is None:
355 exposed = public_methods(obj)
356 if method_to_typeid is not None:
357 assert type(method_to_typeid) is dict
358 exposed = list(exposed) + list(method_to_typeid)
359
360 ident = '%x' % id(obj) # convert to string because xmlrpclib
361 # only has 32 bit signed integers
362 util.debug('%r callable returned object with id %r', typeid, ident)
363
364 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
365 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000366 self.id_to_refcount[ident] = 0
367 # increment the reference count immediately, to avoid
368 # this object being garbage collected before a Proxy
369 # object for it can be created. The caller of create()
370 # is responsible for doing a decref once the Proxy object
371 # has been created.
372 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000373 return ident, tuple(exposed)
374 finally:
375 self.mutex.release()
376
377 def get_methods(self, c, token):
378 '''
379 Return the methods of the shared object indicated by token
380 '''
381 return tuple(self.id_to_obj[token.id][1])
382
383 def accept_connection(self, c, name):
384 '''
385 Spawn a new thread to serve this connection
386 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000387 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000388 c.send(('#RETURN', None))
389 self.serve_client(c)
390
391 def incref(self, c, ident):
392 self.mutex.acquire()
393 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000394 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000395 finally:
396 self.mutex.release()
397
398 def decref(self, c, ident):
399 self.mutex.acquire()
400 try:
401 assert self.id_to_refcount[ident] >= 1
402 self.id_to_refcount[ident] -= 1
403 if self.id_to_refcount[ident] == 0:
404 del self.id_to_obj[ident], self.id_to_refcount[ident]
Benjamin Peterson4ac9ce42009-10-04 14:49:41 +0000405 util.debug('disposing of obj with id %r', ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000406 finally:
407 self.mutex.release()
408
409#
410# Class to represent state of a manager
411#
412
413class State(object):
414 __slots__ = ['value']
415 INITIAL = 0
416 STARTED = 1
417 SHUTDOWN = 2
418
419#
420# Mapping from serializer name to Listener and Client types
421#
422
423listener_client = {
424 'pickle' : (connection.Listener, connection.Client),
425 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
426 }
427
428#
429# Definition of BaseManager
430#
431
432class BaseManager(object):
433 '''
434 Base class for managers
435 '''
436 _registry = {}
437 _Server = Server
438
439 def __init__(self, address=None, authkey=None, serializer='pickle'):
440 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000441 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000442 self._address = address # XXX not final address if eg ('', 0)
443 self._authkey = AuthenticationString(authkey)
444 self._state = State()
445 self._state.value = State.INITIAL
446 self._serializer = serializer
447 self._Listener, self._Client = listener_client[serializer]
448
Benjamin Petersone711caf2008-06-11 16:44:04 +0000449 def get_server(self):
450 '''
451 Return server object with serve_forever() method and address attribute
452 '''
453 assert self._state.value == State.INITIAL
454 return Server(self._registry, self._address,
455 self._authkey, self._serializer)
456
457 def connect(self):
458 '''
459 Connect manager object to the server process
460 '''
461 Listener, Client = listener_client[self._serializer]
462 conn = Client(self._address, authkey=self._authkey)
463 dispatch(conn, None, 'dummy')
464 self._state.value = State.STARTED
465
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000466 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000467 '''
468 Spawn a server process for this manager object
469 '''
470 assert self._state.value == State.INITIAL
471
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200472 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000473 raise TypeError('initializer must be a callable')
474
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475 # pipe over which we will retrieve address of server
476 reader, writer = connection.Pipe(duplex=False)
477
478 # spawn process which runs a server
479 self._process = Process(
480 target=type(self)._run_server,
481 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000482 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000483 )
484 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000485 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000486 self._process.start()
487
488 # get address of server
489 writer.close()
490 self._address = reader.recv()
491 reader.close()
492
493 # register a finalizer
494 self._state.value = State.STARTED
495 self.shutdown = util.Finalize(
496 self, type(self)._finalize_manager,
497 args=(self._process, self._address, self._authkey,
498 self._state, self._Client),
499 exitpriority=0
500 )
501
502 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000503 def _run_server(cls, registry, address, authkey, serializer, writer,
504 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000505 '''
506 Create a server, report its address and run it
507 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000508 if initializer is not None:
509 initializer(*initargs)
510
Benjamin Petersone711caf2008-06-11 16:44:04 +0000511 # create server
512 server = cls._Server(registry, address, authkey, serializer)
513
514 # inform parent process of the server's address
515 writer.send(server.address)
516 writer.close()
517
518 # run the manager
519 util.info('manager serving at %r', server.address)
520 server.serve_forever()
521
522 def _create(self, typeid, *args, **kwds):
523 '''
524 Create a new shared object; return the token and exposed tuple
525 '''
526 assert self._state.value == State.STARTED, 'server not yet started'
527 conn = self._Client(self._address, authkey=self._authkey)
528 try:
529 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
530 finally:
531 conn.close()
532 return Token(typeid, self._address, id), exposed
533
534 def join(self, timeout=None):
535 '''
536 Join the manager process (if it has been spawned)
537 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100538 if self._process is not None:
539 self._process.join(timeout)
540 if not self._process.is_alive():
541 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000542
543 def _debug_info(self):
544 '''
545 Return some info about the servers shared objects and connections
546 '''
547 conn = self._Client(self._address, authkey=self._authkey)
548 try:
549 return dispatch(conn, None, 'debug_info')
550 finally:
551 conn.close()
552
553 def _number_of_objects(self):
554 '''
555 Return the number of shared objects
556 '''
557 conn = self._Client(self._address, authkey=self._authkey)
558 try:
559 return dispatch(conn, None, 'number_of_objects')
560 finally:
561 conn.close()
562
563 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100564 if self._state.value == State.INITIAL:
565 self.start()
566 assert self._state.value == State.STARTED
Benjamin Petersone711caf2008-06-11 16:44:04 +0000567 return self
568
569 def __exit__(self, exc_type, exc_val, exc_tb):
570 self.shutdown()
571
572 @staticmethod
573 def _finalize_manager(process, address, authkey, state, _Client):
574 '''
575 Shutdown the manager process; will be registered as a finalizer
576 '''
577 if process.is_alive():
578 util.info('sending shutdown message to manager')
579 try:
580 conn = _Client(address, authkey=authkey)
581 try:
582 dispatch(conn, None, 'shutdown')
583 finally:
584 conn.close()
585 except Exception:
586 pass
587
Richard Oudkerk3049f122012-06-15 20:08:29 +0100588 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000589 if process.is_alive():
590 util.info('manager still alive')
591 if hasattr(process, 'terminate'):
592 util.info('trying to `terminate()` manager process')
593 process.terminate()
594 process.join(timeout=0.1)
595 if process.is_alive():
596 util.info('manager still alive after terminate')
597
598 state.value = State.SHUTDOWN
599 try:
600 del BaseProxy._address_to_local[address]
601 except KeyError:
602 pass
603
604 address = property(lambda self: self._address)
605
606 @classmethod
607 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
608 method_to_typeid=None, create_method=True):
609 '''
610 Register a typeid with the manager type
611 '''
612 if '_registry' not in cls.__dict__:
613 cls._registry = cls._registry.copy()
614
615 if proxytype is None:
616 proxytype = AutoProxy
617
618 exposed = exposed or getattr(proxytype, '_exposed_', None)
619
620 method_to_typeid = method_to_typeid or \
621 getattr(proxytype, '_method_to_typeid_', None)
622
623 if method_to_typeid:
624 for key, value in list(method_to_typeid.items()):
625 assert type(key) is str, '%r is not a string' % key
626 assert type(value) is str, '%r is not a string' % value
627
628 cls._registry[typeid] = (
629 callable, exposed, method_to_typeid, proxytype
630 )
631
632 if create_method:
633 def temp(self, *args, **kwds):
634 util.debug('requesting creation of a shared %r object', typeid)
635 token, exp = self._create(typeid, *args, **kwds)
636 proxy = proxytype(
637 token, self._serializer, manager=self,
638 authkey=self._authkey, exposed=exp
639 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000640 conn = self._Client(token.address, authkey=self._authkey)
641 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000642 return proxy
643 temp.__name__ = typeid
644 setattr(cls, typeid, temp)
645
646#
647# Subclass of set which get cleared after a fork
648#
649
650class ProcessLocalSet(set):
651 def __init__(self):
652 util.register_after_fork(self, lambda obj: obj.clear())
653 def __reduce__(self):
654 return type(self), ()
655
656#
657# Definition of BaseProxy
658#
659
660class BaseProxy(object):
661 '''
662 A base for proxies of shared objects
663 '''
664 _address_to_local = {}
665 _mutex = util.ForkAwareThreadLock()
666
667 def __init__(self, token, serializer, manager=None,
668 authkey=None, exposed=None, incref=True):
669 BaseProxy._mutex.acquire()
670 try:
671 tls_idset = BaseProxy._address_to_local.get(token.address, None)
672 if tls_idset is None:
673 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
674 BaseProxy._address_to_local[token.address] = tls_idset
675 finally:
676 BaseProxy._mutex.release()
677
678 # self._tls is used to record the connection used by this
679 # thread to communicate with the manager at token.address
680 self._tls = tls_idset[0]
681
682 # self._idset is used to record the identities of all shared
683 # objects for which the current process owns references and
684 # which are in the manager at token.address
685 self._idset = tls_idset[1]
686
687 self._token = token
688 self._id = self._token.id
689 self._manager = manager
690 self._serializer = serializer
691 self._Client = listener_client[serializer][1]
692
693 if authkey is not None:
694 self._authkey = AuthenticationString(authkey)
695 elif self._manager is not None:
696 self._authkey = self._manager._authkey
697 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000698 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000699
700 if incref:
701 self._incref()
702
703 util.register_after_fork(self, BaseProxy._after_fork)
704
705 def _connect(self):
706 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000707 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000708 if threading.current_thread().name != 'MainThread':
709 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000710 conn = self._Client(self._token.address, authkey=self._authkey)
711 dispatch(conn, None, 'accept_connection', (name,))
712 self._tls.connection = conn
713
714 def _callmethod(self, methodname, args=(), kwds={}):
715 '''
716 Try to call a method of the referrent and return a copy of the result
717 '''
718 try:
719 conn = self._tls.connection
720 except AttributeError:
721 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000722 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000723 self._connect()
724 conn = self._tls.connection
725
726 conn.send((self._id, methodname, args, kwds))
727 kind, result = conn.recv()
728
729 if kind == '#RETURN':
730 return result
731 elif kind == '#PROXY':
732 exposed, token = result
733 proxytype = self._manager._registry[token.typeid][-1]
Jesse Noller824f4f32008-09-02 19:12:20 +0000734 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000735 token, self._serializer, manager=self._manager,
736 authkey=self._authkey, exposed=exposed
737 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000738 conn = self._Client(token.address, authkey=self._authkey)
739 dispatch(conn, None, 'decref', (token.id,))
740 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000741 raise convert_to_error(kind, result)
742
743 def _getvalue(self):
744 '''
745 Get a copy of the value of the referent
746 '''
747 return self._callmethod('#GETVALUE')
748
749 def _incref(self):
750 conn = self._Client(self._token.address, authkey=self._authkey)
751 dispatch(conn, None, 'incref', (self._id,))
752 util.debug('INCREF %r', self._token.id)
753
754 self._idset.add(self._id)
755
756 state = self._manager and self._manager._state
757
758 self._close = util.Finalize(
759 self, BaseProxy._decref,
760 args=(self._token, self._authkey, state,
761 self._tls, self._idset, self._Client),
762 exitpriority=10
763 )
764
765 @staticmethod
766 def _decref(token, authkey, state, tls, idset, _Client):
767 idset.discard(token.id)
768
769 # check whether manager is still alive
770 if state is None or state.value == State.STARTED:
771 # tell manager this process no longer cares about referent
772 try:
773 util.debug('DECREF %r', token.id)
774 conn = _Client(token.address, authkey=authkey)
775 dispatch(conn, None, 'decref', (token.id,))
776 except Exception as e:
777 util.debug('... decref failed %s', e)
778
779 else:
780 util.debug('DECREF %r -- manager already shutdown', token.id)
781
782 # check whether we can close this thread's connection because
783 # the process owns no more references to objects for this manager
784 if not idset and hasattr(tls, 'connection'):
785 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000786 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000787 tls.connection.close()
788 del tls.connection
789
790 def _after_fork(self):
791 self._manager = None
792 try:
793 self._incref()
794 except Exception as e:
795 # the proxy may just be for a manager which has shutdown
796 util.info('incref failed: %s' % e)
797
798 def __reduce__(self):
799 kwds = {}
800 if Popen.thread_is_spawning():
801 kwds['authkey'] = self._authkey
802
803 if getattr(self, '_isauto', False):
804 kwds['exposed'] = self._exposed_
805 return (RebuildProxy,
806 (AutoProxy, self._token, self._serializer, kwds))
807 else:
808 return (RebuildProxy,
809 (type(self), self._token, self._serializer, kwds))
810
811 def __deepcopy__(self, memo):
812 return self._getvalue()
813
814 def __repr__(self):
815 return '<%s object, typeid %r at %s>' % \
816 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
817
818 def __str__(self):
819 '''
820 Return representation of the referent (or a fall-back if that fails)
821 '''
822 try:
823 return self._callmethod('__repr__')
824 except Exception:
825 return repr(self)[:-1] + "; '__str__()' failed>"
826
827#
828# Function used for unpickling
829#
830
831def RebuildProxy(func, token, serializer, kwds):
832 '''
833 Function used for unpickling proxy objects.
834
835 If possible the shared object is returned, or otherwise a proxy for it.
836 '''
837 server = getattr(current_process(), '_manager_server', None)
838
839 if server and server.address == token.address:
840 return server.id_to_obj[token.id][0]
841 else:
842 incref = (
843 kwds.pop('incref', True) and
844 not getattr(current_process(), '_inheriting', False)
845 )
846 return func(token, serializer, incref=incref, **kwds)
847
848#
849# Functions to create proxies and proxy types
850#
851
852def MakeProxyType(name, exposed, _cache={}):
853 '''
854 Return an proxy type whose methods are given by `exposed`
855 '''
856 exposed = tuple(exposed)
857 try:
858 return _cache[(name, exposed)]
859 except KeyError:
860 pass
861
862 dic = {}
863
864 for meth in exposed:
865 exec('''def %s(self, *args, **kwds):
866 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
867
868 ProxyType = type(name, (BaseProxy,), dic)
869 ProxyType._exposed_ = exposed
870 _cache[(name, exposed)] = ProxyType
871 return ProxyType
872
873
874def AutoProxy(token, serializer, manager=None, authkey=None,
875 exposed=None, incref=True):
876 '''
877 Return an auto-proxy for `token`
878 '''
879 _Client = listener_client[serializer][1]
880
881 if exposed is None:
882 conn = _Client(token.address, authkey=authkey)
883 try:
884 exposed = dispatch(conn, None, 'get_methods', (token,))
885 finally:
886 conn.close()
887
888 if authkey is None and manager is not None:
889 authkey = manager._authkey
890 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000891 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892
893 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
894 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
895 incref=incref)
896 proxy._isauto = True
897 return proxy
898
899#
900# Types/callables which we will register with SyncManager
901#
902
903class Namespace(object):
904 def __init__(self, **kwds):
905 self.__dict__.update(kwds)
906 def __repr__(self):
907 items = list(self.__dict__.items())
908 temp = []
909 for name, value in items:
910 if not name.startswith('_'):
911 temp.append('%s=%r' % (name, value))
912 temp.sort()
913 return 'Namespace(%s)' % str.join(', ', temp)
914
915class Value(object):
916 def __init__(self, typecode, value, lock=True):
917 self._typecode = typecode
918 self._value = value
919 def get(self):
920 return self._value
921 def set(self, value):
922 self._value = value
923 def __repr__(self):
924 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
925 value = property(get, set)
926
927def Array(typecode, sequence, lock=True):
928 return array.array(typecode, sequence)
929
930#
931# Proxy types used by SyncManager
932#
933
934class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000935 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000936 def __iter__(self):
937 return self
938 def __next__(self, *args):
939 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000940 def send(self, *args):
941 return self._callmethod('send', args)
942 def throw(self, *args):
943 return self._callmethod('throw', args)
944 def close(self, *args):
945 return self._callmethod('close', args)
946
947
948class AcquirerProxy(BaseProxy):
949 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100950 def acquire(self, blocking=True, timeout=None):
951 args = (blocking,) if timeout is None else (blocking, timeout)
952 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000953 def release(self):
954 return self._callmethod('release')
955 def __enter__(self):
956 return self._callmethod('acquire')
957 def __exit__(self, exc_type, exc_val, exc_tb):
958 return self._callmethod('release')
959
960
961class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000962 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000963 def wait(self, timeout=None):
964 return self._callmethod('wait', (timeout,))
965 def notify(self):
966 return self._callmethod('notify')
967 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000968 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200969 def wait_for(self, predicate, timeout=None):
970 result = predicate()
971 if result:
972 return result
973 if timeout is not None:
974 endtime = _time() + timeout
975 else:
976 endtime = None
977 waittime = None
978 while not result:
979 if endtime is not None:
980 waittime = endtime - _time()
981 if waittime <= 0:
982 break
983 self.wait(waittime)
984 result = predicate()
985 return result
986
Benjamin Petersone711caf2008-06-11 16:44:04 +0000987
988class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000989 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000990 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000991 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000992 def set(self):
993 return self._callmethod('set')
994 def clear(self):
995 return self._callmethod('clear')
996 def wait(self, timeout=None):
997 return self._callmethod('wait', (timeout,))
998
Richard Oudkerk3730a172012-06-15 18:26:07 +0100999
1000class BarrierProxy(BaseProxy):
1001 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1002 def wait(self, timeout=None):
1003 return self._callmethod('wait', (timeout,))
1004 def abort(self):
1005 return self._callmethod('abort')
1006 def reset(self):
1007 return self._callmethod('reset')
1008 @property
1009 def parties(self):
1010 return self._callmethod('__getattribute__', ('parties',))
1011 @property
1012 def n_waiting(self):
1013 return self._callmethod('__getattribute__', ('n_waiting',))
1014 @property
1015 def broken(self):
1016 return self._callmethod('__getattribute__', ('broken',))
1017
1018
Benjamin Petersone711caf2008-06-11 16:44:04 +00001019class NamespaceProxy(BaseProxy):
1020 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1021 def __getattr__(self, key):
1022 if key[0] == '_':
1023 return object.__getattribute__(self, key)
1024 callmethod = object.__getattribute__(self, '_callmethod')
1025 return callmethod('__getattribute__', (key,))
1026 def __setattr__(self, key, value):
1027 if key[0] == '_':
1028 return object.__setattr__(self, key, value)
1029 callmethod = object.__getattribute__(self, '_callmethod')
1030 return callmethod('__setattr__', (key, value))
1031 def __delattr__(self, key):
1032 if key[0] == '_':
1033 return object.__delattr__(self, key)
1034 callmethod = object.__getattribute__(self, '_callmethod')
1035 return callmethod('__delattr__', (key,))
1036
1037
1038class ValueProxy(BaseProxy):
1039 _exposed_ = ('get', 'set')
1040 def get(self):
1041 return self._callmethod('get')
1042 def set(self, value):
1043 return self._callmethod('set', (value,))
1044 value = property(get, set)
1045
1046
1047BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001048 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1049 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1051 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001052 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001053class ListProxy(BaseListProxy):
1054 def __iadd__(self, value):
1055 self._callmethod('extend', (value,))
1056 return self
1057 def __imul__(self, value):
1058 self._callmethod('__imul__', (value,))
1059 return self
1060
1061
1062DictProxy = MakeProxyType('DictProxy', (
1063 '__contains__', '__delitem__', '__getitem__', '__len__',
1064 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1065 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1066 ))
1067
1068
1069ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001070 '__len__', '__getitem__', '__setitem__'
1071 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001072
1073
1074PoolProxy = MakeProxyType('PoolProxy', (
1075 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Antoine Pitroude911b22011-12-21 11:03:24 +01001076 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
Benjamin Petersone711caf2008-06-11 16:44:04 +00001077 ))
1078PoolProxy._method_to_typeid_ = {
1079 'apply_async': 'AsyncResult',
1080 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001081 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001082 'imap': 'Iterator',
1083 'imap_unordered': 'Iterator'
1084 }
1085
1086#
1087# Definition of SyncManager
1088#
1089
1090class SyncManager(BaseManager):
1091 '''
1092 Subclass of `BaseManager` which supports a number of shared object types.
1093
1094 The types registered are those intended for the synchronization
1095 of threads, plus `dict`, `list` and `Namespace`.
1096
1097 The `multiprocessing.Manager()` function creates started instances of
1098 this class.
1099 '''
1100
1101SyncManager.register('Queue', queue.Queue)
1102SyncManager.register('JoinableQueue', queue.Queue)
1103SyncManager.register('Event', threading.Event, EventProxy)
1104SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1105SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1106SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1107SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1108 AcquirerProxy)
1109SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001110SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001111SyncManager.register('Pool', Pool, PoolProxy)
1112SyncManager.register('list', list, ListProxy)
1113SyncManager.register('dict', dict, DictProxy)
1114SyncManager.register('Value', Value, ValueProxy)
1115SyncManager.register('Array', Array, ArrayProxy)
1116SyncManager.register('Namespace', Namespace, NamespaceProxy)
1117
1118# types returned by methods of PoolProxy
1119SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1120SyncManager.register('AsyncResult', create_method=False)