blob: 5cae4c1548b130b5161f422522abc44a42ea2bd4 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15# notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17# notice, this list of conditions and the following disclaimer in the
18# documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20# used to endorse or promote products derived from this software
21# without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000034#
35
36__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
37
38#
39# Imports
40#
41
Benjamin Petersone711caf2008-06-11 16:44:04 +000042import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000043import threading
44import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000045import queue
46
47from traceback import format_exc
48from multiprocessing import Process, current_process, active_children, Pool, util, connection
49from multiprocessing.process import AuthenticationString
Florent Xicluna04842a82011-11-11 20:05:50 +010050from multiprocessing.forking import exit, Popen, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000051
Benjamin Petersone711caf2008-06-11 16:44:04 +000052#
Benjamin Petersone711caf2008-06-11 16:44:04 +000053# Register some things for pickling
54#
55
56def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000057 return array.array, (a.typecode, a.tobytes())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000058ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000061if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000062 def rebuild_as_list(obj):
63 return list, (list(obj),)
64 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000065 ForkingPickler.register(view_type, rebuild_as_list)
Amaury Forgeot d'Arcd757e732008-08-20 08:58:40 +000066 import copyreg
67 copyreg.pickle(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000068
69#
70# Type for identifying shared objects
71#
72
73class Token(object):
74 '''
75 Type to uniquely indentify a shared object
76 '''
77 __slots__ = ('typeid', 'address', 'id')
78
79 def __init__(self, typeid, address, id):
80 (self.typeid, self.address, self.id) = (typeid, address, id)
81
82 def __getstate__(self):
83 return (self.typeid, self.address, self.id)
84
85 def __setstate__(self, state):
86 (self.typeid, self.address, self.id) = state
87
88 def __repr__(self):
89 return 'Token(typeid=%r, address=%r, id=%r)' % \
90 (self.typeid, self.address, self.id)
91
92#
93# Function for communication with a manager's server process
94#
95
96def dispatch(c, id, methodname, args=(), kwds={}):
97 '''
98 Send a message to manager using connection `c` and return response
99 '''
100 c.send((id, methodname, args, kwds))
101 kind, result = c.recv()
102 if kind == '#RETURN':
103 return result
104 raise convert_to_error(kind, result)
105
106def convert_to_error(kind, result):
107 if kind == '#ERROR':
108 return result
109 elif kind == '#TRACEBACK':
110 assert type(result) is str
111 return RemoteError(result)
112 elif kind == '#UNSERIALIZABLE':
113 assert type(result) is str
114 return RemoteError('Unserializable message: %s\n' % result)
115 else:
116 return ValueError('Unrecognized message type')
117
118class RemoteError(Exception):
119 def __str__(self):
120 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
121
122#
123# Functions for finding the method names of an object
124#
125
126def all_methods(obj):
127 '''
128 Return a list of names of methods of `obj`
129 '''
130 temp = []
131 for name in dir(obj):
132 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200133 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000134 temp.append(name)
135 return temp
136
137def public_methods(obj):
138 '''
139 Return a list of names of methods of `obj` which do not start with '_'
140 '''
141 return [name for name in all_methods(obj) if name[0] != '_']
142
143#
144# Server which is run in a process controlled by a manager
145#
146
147class Server(object):
148 '''
149 Server class which runs in a process controlled by a manager object
150 '''
151 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
152 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
153
154 def __init__(self, registry, address, authkey, serializer):
155 assert isinstance(authkey, bytes)
156 self.registry = registry
157 self.authkey = AuthenticationString(authkey)
158 Listener, Client = listener_client[serializer]
159
160 # do authentication later
161 self.listener = Listener(address=address, backlog=5)
162 self.address = self.listener.address
163
Jesse Noller63b3a972009-01-21 02:15:48 +0000164 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165 self.id_to_refcount = {}
166 self.mutex = threading.RLock()
167 self.stop = 0
168
169 def serve_forever(self):
170 '''
171 Run the server forever
172 '''
173 current_process()._manager_server = self
174 try:
175 try:
176 while 1:
177 try:
178 c = self.listener.accept()
179 except (OSError, IOError):
180 continue
181 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000182 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183 t.start()
184 except (KeyboardInterrupt, SystemExit):
185 pass
186 finally:
187 self.stop = 999
188 self.listener.close()
189
190 def handle_request(self, c):
191 '''
192 Handle a new connection
193 '''
194 funcname = result = request = None
195 try:
196 connection.deliver_challenge(c, self.authkey)
197 connection.answer_challenge(c, self.authkey)
198 request = c.recv()
199 ignore, funcname, args, kwds = request
200 assert funcname in self.public, '%r unrecognized' % funcname
201 func = getattr(self, funcname)
202 except Exception:
203 msg = ('#TRACEBACK', format_exc())
204 else:
205 try:
206 result = func(c, *args, **kwds)
207 except Exception:
208 msg = ('#TRACEBACK', format_exc())
209 else:
210 msg = ('#RETURN', result)
211 try:
212 c.send(msg)
213 except Exception as e:
214 try:
215 c.send(('#TRACEBACK', format_exc()))
216 except Exception:
217 pass
218 util.info('Failure to send message: %r', msg)
219 util.info(' ... request was %r', request)
220 util.info(' ... exception was %r', e)
221
222 c.close()
223
224 def serve_client(self, conn):
225 '''
226 Handle requests from the proxies in a particular process/thread
227 '''
228 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000229 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230
231 recv = conn.recv
232 send = conn.send
233 id_to_obj = self.id_to_obj
234
235 while not self.stop:
236
237 try:
238 methodname = obj = None
239 request = recv()
240 ident, methodname, args, kwds = request
241 obj, exposed, gettypeid = id_to_obj[ident]
242
243 if methodname not in exposed:
244 raise AttributeError(
245 'method %r of %r object is not in exposed=%r' %
246 (methodname, type(obj), exposed)
247 )
248
249 function = getattr(obj, methodname)
250
251 try:
252 res = function(*args, **kwds)
253 except Exception as e:
254 msg = ('#ERROR', e)
255 else:
256 typeid = gettypeid and gettypeid.get(methodname, None)
257 if typeid:
258 rident, rexposed = self.create(conn, typeid, res)
259 token = Token(typeid, self.address, rident)
260 msg = ('#PROXY', (rexposed, token))
261 else:
262 msg = ('#RETURN', res)
263
264 except AttributeError:
265 if methodname is None:
266 msg = ('#TRACEBACK', format_exc())
267 else:
268 try:
269 fallback_func = self.fallback_mapping[methodname]
270 result = fallback_func(
271 self, conn, ident, obj, *args, **kwds
272 )
273 msg = ('#RETURN', result)
274 except Exception:
275 msg = ('#TRACEBACK', format_exc())
276
277 except EOFError:
278 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000279 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280 sys.exit(0)
281
282 except Exception:
283 msg = ('#TRACEBACK', format_exc())
284
285 try:
286 try:
287 send(msg)
288 except Exception as e:
289 send(('#UNSERIALIZABLE', repr(msg)))
290 except Exception as e:
291 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000292 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000293 util.info(' ... message was %r', msg)
294 util.info(' ... exception was %r', e)
295 conn.close()
296 sys.exit(1)
297
298 def fallback_getvalue(self, conn, ident, obj):
299 return obj
300
301 def fallback_str(self, conn, ident, obj):
302 return str(obj)
303
304 def fallback_repr(self, conn, ident, obj):
305 return repr(obj)
306
307 fallback_mapping = {
308 '__str__':fallback_str,
309 '__repr__':fallback_repr,
310 '#GETVALUE':fallback_getvalue
311 }
312
313 def dummy(self, c):
314 pass
315
316 def debug_info(self, c):
317 '''
318 Return some info --- useful to spot problems with refcounting
319 '''
320 self.mutex.acquire()
321 try:
322 result = []
323 keys = list(self.id_to_obj.keys())
324 keys.sort()
325 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000326 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327 result.append(' %s: refcount=%s\n %s' %
328 (ident, self.id_to_refcount[ident],
329 str(self.id_to_obj[ident][0])[:75]))
330 return '\n'.join(result)
331 finally:
332 self.mutex.release()
333
334 def number_of_objects(self, c):
335 '''
336 Number of shared objects
337 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000338 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339
340 def shutdown(self, c):
341 '''
342 Shutdown this process
343 '''
344 try:
345 try:
346 util.debug('manager received shutdown message')
347 c.send(('#RETURN', None))
348
349 if sys.stdout != sys.__stdout__:
350 util.debug('resetting stdout, stderr')
351 sys.stdout = sys.__stdout__
352 sys.stderr = sys.__stderr__
353
354 util._run_finalizers(0)
355
356 for p in active_children():
357 util.debug('terminating a child process of manager')
358 p.terminate()
359
360 for p in active_children():
361 util.debug('terminating a child process of manager')
362 p.join()
363
364 util._run_finalizers()
365 util.info('manager exiting with exitcode 0')
366 except:
367 import traceback
368 traceback.print_exc()
369 finally:
370 exit(0)
371
372 def create(self, c, typeid, *args, **kwds):
373 '''
374 Create a new shared object and return its id
375 '''
376 self.mutex.acquire()
377 try:
378 callable, exposed, method_to_typeid, proxytype = \
379 self.registry[typeid]
380
381 if callable is None:
382 assert len(args) == 1 and not kwds
383 obj = args[0]
384 else:
385 obj = callable(*args, **kwds)
386
387 if exposed is None:
388 exposed = public_methods(obj)
389 if method_to_typeid is not None:
390 assert type(method_to_typeid) is dict
391 exposed = list(exposed) + list(method_to_typeid)
392
393 ident = '%x' % id(obj) # convert to string because xmlrpclib
394 # only has 32 bit signed integers
395 util.debug('%r callable returned object with id %r', typeid, ident)
396
397 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
398 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000399 self.id_to_refcount[ident] = 0
400 # increment the reference count immediately, to avoid
401 # this object being garbage collected before a Proxy
402 # object for it can be created. The caller of create()
403 # is responsible for doing a decref once the Proxy object
404 # has been created.
405 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000406 return ident, tuple(exposed)
407 finally:
408 self.mutex.release()
409
410 def get_methods(self, c, token):
411 '''
412 Return the methods of the shared object indicated by token
413 '''
414 return tuple(self.id_to_obj[token.id][1])
415
416 def accept_connection(self, c, name):
417 '''
418 Spawn a new thread to serve this connection
419 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000420 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000421 c.send(('#RETURN', None))
422 self.serve_client(c)
423
424 def incref(self, c, ident):
425 self.mutex.acquire()
426 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000427 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000428 finally:
429 self.mutex.release()
430
431 def decref(self, c, ident):
432 self.mutex.acquire()
433 try:
434 assert self.id_to_refcount[ident] >= 1
435 self.id_to_refcount[ident] -= 1
436 if self.id_to_refcount[ident] == 0:
437 del self.id_to_obj[ident], self.id_to_refcount[ident]
Benjamin Peterson4ac9ce42009-10-04 14:49:41 +0000438 util.debug('disposing of obj with id %r', ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000439 finally:
440 self.mutex.release()
441
442#
443# Class to represent state of a manager
444#
445
446class State(object):
447 __slots__ = ['value']
448 INITIAL = 0
449 STARTED = 1
450 SHUTDOWN = 2
451
452#
453# Mapping from serializer name to Listener and Client types
454#
455
456listener_client = {
457 'pickle' : (connection.Listener, connection.Client),
458 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
459 }
460
461#
462# Definition of BaseManager
463#
464
465class BaseManager(object):
466 '''
467 Base class for managers
468 '''
469 _registry = {}
470 _Server = Server
471
472 def __init__(self, address=None, authkey=None, serializer='pickle'):
473 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000474 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475 self._address = address # XXX not final address if eg ('', 0)
476 self._authkey = AuthenticationString(authkey)
477 self._state = State()
478 self._state.value = State.INITIAL
479 self._serializer = serializer
480 self._Listener, self._Client = listener_client[serializer]
481
482 def __reduce__(self):
483 return type(self).from_address, \
484 (self._address, self._authkey, self._serializer)
485
486 def get_server(self):
487 '''
488 Return server object with serve_forever() method and address attribute
489 '''
490 assert self._state.value == State.INITIAL
491 return Server(self._registry, self._address,
492 self._authkey, self._serializer)
493
494 def connect(self):
495 '''
496 Connect manager object to the server process
497 '''
498 Listener, Client = listener_client[self._serializer]
499 conn = Client(self._address, authkey=self._authkey)
500 dispatch(conn, None, 'dummy')
501 self._state.value = State.STARTED
502
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000503 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000504 '''
505 Spawn a server process for this manager object
506 '''
507 assert self._state.value == State.INITIAL
508
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200509 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000510 raise TypeError('initializer must be a callable')
511
Benjamin Petersone711caf2008-06-11 16:44:04 +0000512 # pipe over which we will retrieve address of server
513 reader, writer = connection.Pipe(duplex=False)
514
515 # spawn process which runs a server
516 self._process = Process(
517 target=type(self)._run_server,
518 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000519 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520 )
521 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000522 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000523 self._process.start()
524
525 # get address of server
526 writer.close()
527 self._address = reader.recv()
528 reader.close()
529
530 # register a finalizer
531 self._state.value = State.STARTED
532 self.shutdown = util.Finalize(
533 self, type(self)._finalize_manager,
534 args=(self._process, self._address, self._authkey,
535 self._state, self._Client),
536 exitpriority=0
537 )
538
539 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000540 def _run_server(cls, registry, address, authkey, serializer, writer,
541 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000542 '''
543 Create a server, report its address and run it
544 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000545 if initializer is not None:
546 initializer(*initargs)
547
Benjamin Petersone711caf2008-06-11 16:44:04 +0000548 # create server
549 server = cls._Server(registry, address, authkey, serializer)
550
551 # inform parent process of the server's address
552 writer.send(server.address)
553 writer.close()
554
555 # run the manager
556 util.info('manager serving at %r', server.address)
557 server.serve_forever()
558
559 def _create(self, typeid, *args, **kwds):
560 '''
561 Create a new shared object; return the token and exposed tuple
562 '''
563 assert self._state.value == State.STARTED, 'server not yet started'
564 conn = self._Client(self._address, authkey=self._authkey)
565 try:
566 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
567 finally:
568 conn.close()
569 return Token(typeid, self._address, id), exposed
570
571 def join(self, timeout=None):
572 '''
573 Join the manager process (if it has been spawned)
574 '''
575 self._process.join(timeout)
576
577 def _debug_info(self):
578 '''
579 Return some info about the servers shared objects and connections
580 '''
581 conn = self._Client(self._address, authkey=self._authkey)
582 try:
583 return dispatch(conn, None, 'debug_info')
584 finally:
585 conn.close()
586
587 def _number_of_objects(self):
588 '''
589 Return the number of shared objects
590 '''
591 conn = self._Client(self._address, authkey=self._authkey)
592 try:
593 return dispatch(conn, None, 'number_of_objects')
594 finally:
595 conn.close()
596
597 def __enter__(self):
598 return self
599
600 def __exit__(self, exc_type, exc_val, exc_tb):
601 self.shutdown()
602
603 @staticmethod
604 def _finalize_manager(process, address, authkey, state, _Client):
605 '''
606 Shutdown the manager process; will be registered as a finalizer
607 '''
608 if process.is_alive():
609 util.info('sending shutdown message to manager')
610 try:
611 conn = _Client(address, authkey=authkey)
612 try:
613 dispatch(conn, None, 'shutdown')
614 finally:
615 conn.close()
616 except Exception:
617 pass
618
619 process.join(timeout=0.2)
620 if process.is_alive():
621 util.info('manager still alive')
622 if hasattr(process, 'terminate'):
623 util.info('trying to `terminate()` manager process')
624 process.terminate()
625 process.join(timeout=0.1)
626 if process.is_alive():
627 util.info('manager still alive after terminate')
628
629 state.value = State.SHUTDOWN
630 try:
631 del BaseProxy._address_to_local[address]
632 except KeyError:
633 pass
634
635 address = property(lambda self: self._address)
636
637 @classmethod
638 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
639 method_to_typeid=None, create_method=True):
640 '''
641 Register a typeid with the manager type
642 '''
643 if '_registry' not in cls.__dict__:
644 cls._registry = cls._registry.copy()
645
646 if proxytype is None:
647 proxytype = AutoProxy
648
649 exposed = exposed or getattr(proxytype, '_exposed_', None)
650
651 method_to_typeid = method_to_typeid or \
652 getattr(proxytype, '_method_to_typeid_', None)
653
654 if method_to_typeid:
655 for key, value in list(method_to_typeid.items()):
656 assert type(key) is str, '%r is not a string' % key
657 assert type(value) is str, '%r is not a string' % value
658
659 cls._registry[typeid] = (
660 callable, exposed, method_to_typeid, proxytype
661 )
662
663 if create_method:
664 def temp(self, *args, **kwds):
665 util.debug('requesting creation of a shared %r object', typeid)
666 token, exp = self._create(typeid, *args, **kwds)
667 proxy = proxytype(
668 token, self._serializer, manager=self,
669 authkey=self._authkey, exposed=exp
670 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000671 conn = self._Client(token.address, authkey=self._authkey)
672 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000673 return proxy
674 temp.__name__ = typeid
675 setattr(cls, typeid, temp)
676
677#
678# Subclass of set which get cleared after a fork
679#
680
681class ProcessLocalSet(set):
682 def __init__(self):
683 util.register_after_fork(self, lambda obj: obj.clear())
684 def __reduce__(self):
685 return type(self), ()
686
687#
688# Definition of BaseProxy
689#
690
691class BaseProxy(object):
692 '''
693 A base for proxies of shared objects
694 '''
695 _address_to_local = {}
696 _mutex = util.ForkAwareThreadLock()
697
698 def __init__(self, token, serializer, manager=None,
699 authkey=None, exposed=None, incref=True):
700 BaseProxy._mutex.acquire()
701 try:
702 tls_idset = BaseProxy._address_to_local.get(token.address, None)
703 if tls_idset is None:
704 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
705 BaseProxy._address_to_local[token.address] = tls_idset
706 finally:
707 BaseProxy._mutex.release()
708
709 # self._tls is used to record the connection used by this
710 # thread to communicate with the manager at token.address
711 self._tls = tls_idset[0]
712
713 # self._idset is used to record the identities of all shared
714 # objects for which the current process owns references and
715 # which are in the manager at token.address
716 self._idset = tls_idset[1]
717
718 self._token = token
719 self._id = self._token.id
720 self._manager = manager
721 self._serializer = serializer
722 self._Client = listener_client[serializer][1]
723
724 if authkey is not None:
725 self._authkey = AuthenticationString(authkey)
726 elif self._manager is not None:
727 self._authkey = self._manager._authkey
728 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000729 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000730
731 if incref:
732 self._incref()
733
734 util.register_after_fork(self, BaseProxy._after_fork)
735
736 def _connect(self):
737 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000738 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000739 if threading.current_thread().name != 'MainThread':
740 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000741 conn = self._Client(self._token.address, authkey=self._authkey)
742 dispatch(conn, None, 'accept_connection', (name,))
743 self._tls.connection = conn
744
745 def _callmethod(self, methodname, args=(), kwds={}):
746 '''
747 Try to call a method of the referrent and return a copy of the result
748 '''
749 try:
750 conn = self._tls.connection
751 except AttributeError:
752 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000753 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000754 self._connect()
755 conn = self._tls.connection
756
757 conn.send((self._id, methodname, args, kwds))
758 kind, result = conn.recv()
759
760 if kind == '#RETURN':
761 return result
762 elif kind == '#PROXY':
763 exposed, token = result
764 proxytype = self._manager._registry[token.typeid][-1]
Jesse Noller824f4f32008-09-02 19:12:20 +0000765 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000766 token, self._serializer, manager=self._manager,
767 authkey=self._authkey, exposed=exposed
768 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000769 conn = self._Client(token.address, authkey=self._authkey)
770 dispatch(conn, None, 'decref', (token.id,))
771 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000772 raise convert_to_error(kind, result)
773
774 def _getvalue(self):
775 '''
776 Get a copy of the value of the referent
777 '''
778 return self._callmethod('#GETVALUE')
779
780 def _incref(self):
781 conn = self._Client(self._token.address, authkey=self._authkey)
782 dispatch(conn, None, 'incref', (self._id,))
783 util.debug('INCREF %r', self._token.id)
784
785 self._idset.add(self._id)
786
787 state = self._manager and self._manager._state
788
789 self._close = util.Finalize(
790 self, BaseProxy._decref,
791 args=(self._token, self._authkey, state,
792 self._tls, self._idset, self._Client),
793 exitpriority=10
794 )
795
796 @staticmethod
797 def _decref(token, authkey, state, tls, idset, _Client):
798 idset.discard(token.id)
799
800 # check whether manager is still alive
801 if state is None or state.value == State.STARTED:
802 # tell manager this process no longer cares about referent
803 try:
804 util.debug('DECREF %r', token.id)
805 conn = _Client(token.address, authkey=authkey)
806 dispatch(conn, None, 'decref', (token.id,))
807 except Exception as e:
808 util.debug('... decref failed %s', e)
809
810 else:
811 util.debug('DECREF %r -- manager already shutdown', token.id)
812
813 # check whether we can close this thread's connection because
814 # the process owns no more references to objects for this manager
815 if not idset and hasattr(tls, 'connection'):
816 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000817 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000818 tls.connection.close()
819 del tls.connection
820
821 def _after_fork(self):
822 self._manager = None
823 try:
824 self._incref()
825 except Exception as e:
826 # the proxy may just be for a manager which has shutdown
827 util.info('incref failed: %s' % e)
828
829 def __reduce__(self):
830 kwds = {}
831 if Popen.thread_is_spawning():
832 kwds['authkey'] = self._authkey
833
834 if getattr(self, '_isauto', False):
835 kwds['exposed'] = self._exposed_
836 return (RebuildProxy,
837 (AutoProxy, self._token, self._serializer, kwds))
838 else:
839 return (RebuildProxy,
840 (type(self), self._token, self._serializer, kwds))
841
842 def __deepcopy__(self, memo):
843 return self._getvalue()
844
845 def __repr__(self):
846 return '<%s object, typeid %r at %s>' % \
847 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
848
849 def __str__(self):
850 '''
851 Return representation of the referent (or a fall-back if that fails)
852 '''
853 try:
854 return self._callmethod('__repr__')
855 except Exception:
856 return repr(self)[:-1] + "; '__str__()' failed>"
857
858#
859# Function used for unpickling
860#
861
862def RebuildProxy(func, token, serializer, kwds):
863 '''
864 Function used for unpickling proxy objects.
865
866 If possible the shared object is returned, or otherwise a proxy for it.
867 '''
868 server = getattr(current_process(), '_manager_server', None)
869
870 if server and server.address == token.address:
871 return server.id_to_obj[token.id][0]
872 else:
873 incref = (
874 kwds.pop('incref', True) and
875 not getattr(current_process(), '_inheriting', False)
876 )
877 return func(token, serializer, incref=incref, **kwds)
878
879#
880# Functions to create proxies and proxy types
881#
882
883def MakeProxyType(name, exposed, _cache={}):
884 '''
885 Return an proxy type whose methods are given by `exposed`
886 '''
887 exposed = tuple(exposed)
888 try:
889 return _cache[(name, exposed)]
890 except KeyError:
891 pass
892
893 dic = {}
894
895 for meth in exposed:
896 exec('''def %s(self, *args, **kwds):
897 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
898
899 ProxyType = type(name, (BaseProxy,), dic)
900 ProxyType._exposed_ = exposed
901 _cache[(name, exposed)] = ProxyType
902 return ProxyType
903
904
905def AutoProxy(token, serializer, manager=None, authkey=None,
906 exposed=None, incref=True):
907 '''
908 Return an auto-proxy for `token`
909 '''
910 _Client = listener_client[serializer][1]
911
912 if exposed is None:
913 conn = _Client(token.address, authkey=authkey)
914 try:
915 exposed = dispatch(conn, None, 'get_methods', (token,))
916 finally:
917 conn.close()
918
919 if authkey is None and manager is not None:
920 authkey = manager._authkey
921 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000922 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000923
924 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
925 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
926 incref=incref)
927 proxy._isauto = True
928 return proxy
929
930#
931# Types/callables which we will register with SyncManager
932#
933
934class Namespace(object):
935 def __init__(self, **kwds):
936 self.__dict__.update(kwds)
937 def __repr__(self):
938 items = list(self.__dict__.items())
939 temp = []
940 for name, value in items:
941 if not name.startswith('_'):
942 temp.append('%s=%r' % (name, value))
943 temp.sort()
944 return 'Namespace(%s)' % str.join(', ', temp)
945
946class Value(object):
947 def __init__(self, typecode, value, lock=True):
948 self._typecode = typecode
949 self._value = value
950 def get(self):
951 return self._value
952 def set(self, value):
953 self._value = value
954 def __repr__(self):
955 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
956 value = property(get, set)
957
958def Array(typecode, sequence, lock=True):
959 return array.array(typecode, sequence)
960
961#
962# Proxy types used by SyncManager
963#
964
965class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000966 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000967 def __iter__(self):
968 return self
969 def __next__(self, *args):
970 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 def send(self, *args):
972 return self._callmethod('send', args)
973 def throw(self, *args):
974 return self._callmethod('throw', args)
975 def close(self, *args):
976 return self._callmethod('close', args)
977
978
979class AcquirerProxy(BaseProxy):
980 _exposed_ = ('acquire', 'release')
981 def acquire(self, blocking=True):
982 return self._callmethod('acquire', (blocking,))
983 def release(self):
984 return self._callmethod('release')
985 def __enter__(self):
986 return self._callmethod('acquire')
987 def __exit__(self, exc_type, exc_val, exc_tb):
988 return self._callmethod('release')
989
990
991class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000992 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000993 def wait(self, timeout=None):
994 return self._callmethod('wait', (timeout,))
995 def notify(self):
996 return self._callmethod('notify')
997 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000998 return self._callmethod('notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000999
1000class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001001 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001002 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001003 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001004 def set(self):
1005 return self._callmethod('set')
1006 def clear(self):
1007 return self._callmethod('clear')
1008 def wait(self, timeout=None):
1009 return self._callmethod('wait', (timeout,))
1010
1011class NamespaceProxy(BaseProxy):
1012 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1013 def __getattr__(self, key):
1014 if key[0] == '_':
1015 return object.__getattribute__(self, key)
1016 callmethod = object.__getattribute__(self, '_callmethod')
1017 return callmethod('__getattribute__', (key,))
1018 def __setattr__(self, key, value):
1019 if key[0] == '_':
1020 return object.__setattr__(self, key, value)
1021 callmethod = object.__getattribute__(self, '_callmethod')
1022 return callmethod('__setattr__', (key, value))
1023 def __delattr__(self, key):
1024 if key[0] == '_':
1025 return object.__delattr__(self, key)
1026 callmethod = object.__getattribute__(self, '_callmethod')
1027 return callmethod('__delattr__', (key,))
1028
1029
1030class ValueProxy(BaseProxy):
1031 _exposed_ = ('get', 'set')
1032 def get(self):
1033 return self._callmethod('get')
1034 def set(self, value):
1035 return self._callmethod('set', (value,))
1036 value = property(get, set)
1037
1038
1039BaseListProxy = MakeProxyType('BaseListProxy', (
1040 '__add__', '__contains__', '__delitem__', '__delslice__',
1041 '__getitem__', '__getslice__', '__len__', '__mul__',
1042 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1043 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1044 'reverse', 'sort', '__imul__'
1045 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1046class ListProxy(BaseListProxy):
1047 def __iadd__(self, value):
1048 self._callmethod('extend', (value,))
1049 return self
1050 def __imul__(self, value):
1051 self._callmethod('__imul__', (value,))
1052 return self
1053
1054
1055DictProxy = MakeProxyType('DictProxy', (
1056 '__contains__', '__delitem__', '__getitem__', '__len__',
1057 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1058 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1059 ))
1060
1061
1062ArrayProxy = MakeProxyType('ArrayProxy', (
1063 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1064 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1065
1066
1067PoolProxy = MakeProxyType('PoolProxy', (
1068 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Antoine Pitroude911b22011-12-21 11:03:24 +01001069 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
Benjamin Petersone711caf2008-06-11 16:44:04 +00001070 ))
1071PoolProxy._method_to_typeid_ = {
1072 'apply_async': 'AsyncResult',
1073 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001074 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001075 'imap': 'Iterator',
1076 'imap_unordered': 'Iterator'
1077 }
1078
1079#
1080# Definition of SyncManager
1081#
1082
1083class SyncManager(BaseManager):
1084 '''
1085 Subclass of `BaseManager` which supports a number of shared object types.
1086
1087 The types registered are those intended for the synchronization
1088 of threads, plus `dict`, `list` and `Namespace`.
1089
1090 The `multiprocessing.Manager()` function creates started instances of
1091 this class.
1092 '''
1093
1094SyncManager.register('Queue', queue.Queue)
1095SyncManager.register('JoinableQueue', queue.Queue)
1096SyncManager.register('Event', threading.Event, EventProxy)
1097SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1098SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1099SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1100SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1101 AcquirerProxy)
1102SyncManager.register('Condition', threading.Condition, ConditionProxy)
1103SyncManager.register('Pool', Pool, PoolProxy)
1104SyncManager.register('list', list, ListProxy)
1105SyncManager.register('dict', dict, DictProxy)
1106SyncManager.register('Value', Value, ValueProxy)
1107SyncManager.register('Array', Array, ArrayProxy)
1108SyncManager.register('Namespace', Namespace, NamespaceProxy)
1109
1110# types returned by methods of PoolProxy
1111SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1112SyncManager.register('AsyncResult', create_method=False)