blob: f895c129a50a74ad6bf1b35b161ea21176955c04 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8#
9
10__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11
12#
13# Imports
14#
15
16import os
17import sys
18import weakref
19import threading
20import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import queue
22
23from traceback import format_exc
24from multiprocessing import Process, current_process, active_children, Pool, util, connection
25from multiprocessing.process import AuthenticationString
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000026from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000027from multiprocessing.util import Finalize, info
28
29try:
30 from cPickle import PicklingError
31except ImportError:
32 from pickle import PicklingError
33
34#
Benjamin Petersone711caf2008-06-11 16:44:04 +000035# Register some things for pickling
36#
37
38def reduce_array(a):
39 return array.array, (a.typecode, a.tostring())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000040ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
42view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000043if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000044 def rebuild_as_list(obj):
45 return list, (list(obj),)
46 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000047 ForkingPickler.register(view_type, rebuild_as_list)
Amaury Forgeot d'Arcd757e732008-08-20 08:58:40 +000048 import copyreg
49 copyreg.pickle(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000050
51#
52# Type for identifying shared objects
53#
54
55class Token(object):
56 '''
57 Type to uniquely indentify a shared object
58 '''
59 __slots__ = ('typeid', 'address', 'id')
60
61 def __init__(self, typeid, address, id):
62 (self.typeid, self.address, self.id) = (typeid, address, id)
63
64 def __getstate__(self):
65 return (self.typeid, self.address, self.id)
66
67 def __setstate__(self, state):
68 (self.typeid, self.address, self.id) = state
69
70 def __repr__(self):
71 return 'Token(typeid=%r, address=%r, id=%r)' % \
72 (self.typeid, self.address, self.id)
73
74#
75# Function for communication with a manager's server process
76#
77
78def dispatch(c, id, methodname, args=(), kwds={}):
79 '''
80 Send a message to manager using connection `c` and return response
81 '''
82 c.send((id, methodname, args, kwds))
83 kind, result = c.recv()
84 if kind == '#RETURN':
85 return result
86 raise convert_to_error(kind, result)
87
88def convert_to_error(kind, result):
89 if kind == '#ERROR':
90 return result
91 elif kind == '#TRACEBACK':
92 assert type(result) is str
93 return RemoteError(result)
94 elif kind == '#UNSERIALIZABLE':
95 assert type(result) is str
96 return RemoteError('Unserializable message: %s\n' % result)
97 else:
98 return ValueError('Unrecognized message type')
99
100class RemoteError(Exception):
101 def __str__(self):
102 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
103
104#
105# Functions for finding the method names of an object
106#
107
108def all_methods(obj):
109 '''
110 Return a list of names of methods of `obj`
111 '''
112 temp = []
113 for name in dir(obj):
114 func = getattr(obj, name)
115 if hasattr(func, '__call__'):
116 temp.append(name)
117 return temp
118
119def public_methods(obj):
120 '''
121 Return a list of names of methods of `obj` which do not start with '_'
122 '''
123 return [name for name in all_methods(obj) if name[0] != '_']
124
125#
126# Server which is run in a process controlled by a manager
127#
128
129class Server(object):
130 '''
131 Server class which runs in a process controlled by a manager object
132 '''
133 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
134 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
135
136 def __init__(self, registry, address, authkey, serializer):
137 assert isinstance(authkey, bytes)
138 self.registry = registry
139 self.authkey = AuthenticationString(authkey)
140 Listener, Client = listener_client[serializer]
141
142 # do authentication later
143 self.listener = Listener(address=address, backlog=5)
144 self.address = self.listener.address
145
146 self.id_to_obj = {0: (None, ())}
147 self.id_to_refcount = {}
148 self.mutex = threading.RLock()
149 self.stop = 0
150
151 def serve_forever(self):
152 '''
153 Run the server forever
154 '''
155 current_process()._manager_server = self
156 try:
157 try:
158 while 1:
159 try:
160 c = self.listener.accept()
161 except (OSError, IOError):
162 continue
163 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000164 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165 t.start()
166 except (KeyboardInterrupt, SystemExit):
167 pass
168 finally:
169 self.stop = 999
170 self.listener.close()
171
172 def handle_request(self, c):
173 '''
174 Handle a new connection
175 '''
176 funcname = result = request = None
177 try:
178 connection.deliver_challenge(c, self.authkey)
179 connection.answer_challenge(c, self.authkey)
180 request = c.recv()
181 ignore, funcname, args, kwds = request
182 assert funcname in self.public, '%r unrecognized' % funcname
183 func = getattr(self, funcname)
184 except Exception:
185 msg = ('#TRACEBACK', format_exc())
186 else:
187 try:
188 result = func(c, *args, **kwds)
189 except Exception:
190 msg = ('#TRACEBACK', format_exc())
191 else:
192 msg = ('#RETURN', result)
193 try:
194 c.send(msg)
195 except Exception as e:
196 try:
197 c.send(('#TRACEBACK', format_exc()))
198 except Exception:
199 pass
200 util.info('Failure to send message: %r', msg)
201 util.info(' ... request was %r', request)
202 util.info(' ... exception was %r', e)
203
204 c.close()
205
206 def serve_client(self, conn):
207 '''
208 Handle requests from the proxies in a particular process/thread
209 '''
210 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000211 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000212
213 recv = conn.recv
214 send = conn.send
215 id_to_obj = self.id_to_obj
216
217 while not self.stop:
218
219 try:
220 methodname = obj = None
221 request = recv()
222 ident, methodname, args, kwds = request
223 obj, exposed, gettypeid = id_to_obj[ident]
224
225 if methodname not in exposed:
226 raise AttributeError(
227 'method %r of %r object is not in exposed=%r' %
228 (methodname, type(obj), exposed)
229 )
230
231 function = getattr(obj, methodname)
232
233 try:
234 res = function(*args, **kwds)
235 except Exception as e:
236 msg = ('#ERROR', e)
237 else:
238 typeid = gettypeid and gettypeid.get(methodname, None)
239 if typeid:
240 rident, rexposed = self.create(conn, typeid, res)
241 token = Token(typeid, self.address, rident)
242 msg = ('#PROXY', (rexposed, token))
243 else:
244 msg = ('#RETURN', res)
245
246 except AttributeError:
247 if methodname is None:
248 msg = ('#TRACEBACK', format_exc())
249 else:
250 try:
251 fallback_func = self.fallback_mapping[methodname]
252 result = fallback_func(
253 self, conn, ident, obj, *args, **kwds
254 )
255 msg = ('#RETURN', result)
256 except Exception:
257 msg = ('#TRACEBACK', format_exc())
258
259 except EOFError:
260 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000261 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262 sys.exit(0)
263
264 except Exception:
265 msg = ('#TRACEBACK', format_exc())
266
267 try:
268 try:
269 send(msg)
270 except Exception as e:
271 send(('#UNSERIALIZABLE', repr(msg)))
272 except Exception as e:
273 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000274 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275 util.info(' ... message was %r', msg)
276 util.info(' ... exception was %r', e)
277 conn.close()
278 sys.exit(1)
279
280 def fallback_getvalue(self, conn, ident, obj):
281 return obj
282
283 def fallback_str(self, conn, ident, obj):
284 return str(obj)
285
286 def fallback_repr(self, conn, ident, obj):
287 return repr(obj)
288
289 fallback_mapping = {
290 '__str__':fallback_str,
291 '__repr__':fallback_repr,
292 '#GETVALUE':fallback_getvalue
293 }
294
295 def dummy(self, c):
296 pass
297
298 def debug_info(self, c):
299 '''
300 Return some info --- useful to spot problems with refcounting
301 '''
302 self.mutex.acquire()
303 try:
304 result = []
305 keys = list(self.id_to_obj.keys())
306 keys.sort()
307 for ident in keys:
308 if ident != 0:
309 result.append(' %s: refcount=%s\n %s' %
310 (ident, self.id_to_refcount[ident],
311 str(self.id_to_obj[ident][0])[:75]))
312 return '\n'.join(result)
313 finally:
314 self.mutex.release()
315
316 def number_of_objects(self, c):
317 '''
318 Number of shared objects
319 '''
320 return len(self.id_to_obj) - 1 # don't count ident=0
321
322 def shutdown(self, c):
323 '''
324 Shutdown this process
325 '''
326 try:
327 try:
328 util.debug('manager received shutdown message')
329 c.send(('#RETURN', None))
330
331 if sys.stdout != sys.__stdout__:
332 util.debug('resetting stdout, stderr')
333 sys.stdout = sys.__stdout__
334 sys.stderr = sys.__stderr__
335
336 util._run_finalizers(0)
337
338 for p in active_children():
339 util.debug('terminating a child process of manager')
340 p.terminate()
341
342 for p in active_children():
343 util.debug('terminating a child process of manager')
344 p.join()
345
346 util._run_finalizers()
347 util.info('manager exiting with exitcode 0')
348 except:
349 import traceback
350 traceback.print_exc()
351 finally:
352 exit(0)
353
354 def create(self, c, typeid, *args, **kwds):
355 '''
356 Create a new shared object and return its id
357 '''
358 self.mutex.acquire()
359 try:
360 callable, exposed, method_to_typeid, proxytype = \
361 self.registry[typeid]
362
363 if callable is None:
364 assert len(args) == 1 and not kwds
365 obj = args[0]
366 else:
367 obj = callable(*args, **kwds)
368
369 if exposed is None:
370 exposed = public_methods(obj)
371 if method_to_typeid is not None:
372 assert type(method_to_typeid) is dict
373 exposed = list(exposed) + list(method_to_typeid)
374
375 ident = '%x' % id(obj) # convert to string because xmlrpclib
376 # only has 32 bit signed integers
377 util.debug('%r callable returned object with id %r', typeid, ident)
378
379 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
380 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000381 self.id_to_refcount[ident] = 0
382 # increment the reference count immediately, to avoid
383 # this object being garbage collected before a Proxy
384 # object for it can be created. The caller of create()
385 # is responsible for doing a decref once the Proxy object
386 # has been created.
387 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000388 return ident, tuple(exposed)
389 finally:
390 self.mutex.release()
391
392 def get_methods(self, c, token):
393 '''
394 Return the methods of the shared object indicated by token
395 '''
396 return tuple(self.id_to_obj[token.id][1])
397
398 def accept_connection(self, c, name):
399 '''
400 Spawn a new thread to serve this connection
401 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000402 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000403 c.send(('#RETURN', None))
404 self.serve_client(c)
405
406 def incref(self, c, ident):
407 self.mutex.acquire()
408 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000409 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000410 finally:
411 self.mutex.release()
412
413 def decref(self, c, ident):
414 self.mutex.acquire()
415 try:
416 assert self.id_to_refcount[ident] >= 1
417 self.id_to_refcount[ident] -= 1
418 if self.id_to_refcount[ident] == 0:
419 del self.id_to_obj[ident], self.id_to_refcount[ident]
420 util.debug('disposing of obj with id %d', ident)
421 finally:
422 self.mutex.release()
423
424#
425# Class to represent state of a manager
426#
427
428class State(object):
429 __slots__ = ['value']
430 INITIAL = 0
431 STARTED = 1
432 SHUTDOWN = 2
433
434#
435# Mapping from serializer name to Listener and Client types
436#
437
438listener_client = {
439 'pickle' : (connection.Listener, connection.Client),
440 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
441 }
442
443#
444# Definition of BaseManager
445#
446
447class BaseManager(object):
448 '''
449 Base class for managers
450 '''
451 _registry = {}
452 _Server = Server
453
454 def __init__(self, address=None, authkey=None, serializer='pickle'):
455 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000456 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000457 self._address = address # XXX not final address if eg ('', 0)
458 self._authkey = AuthenticationString(authkey)
459 self._state = State()
460 self._state.value = State.INITIAL
461 self._serializer = serializer
462 self._Listener, self._Client = listener_client[serializer]
463
464 def __reduce__(self):
465 return type(self).from_address, \
466 (self._address, self._authkey, self._serializer)
467
468 def get_server(self):
469 '''
470 Return server object with serve_forever() method and address attribute
471 '''
472 assert self._state.value == State.INITIAL
473 return Server(self._registry, self._address,
474 self._authkey, self._serializer)
475
476 def connect(self):
477 '''
478 Connect manager object to the server process
479 '''
480 Listener, Client = listener_client[self._serializer]
481 conn = Client(self._address, authkey=self._authkey)
482 dispatch(conn, None, 'dummy')
483 self._state.value = State.STARTED
484
485 def start(self):
486 '''
487 Spawn a server process for this manager object
488 '''
489 assert self._state.value == State.INITIAL
490
491 # pipe over which we will retrieve address of server
492 reader, writer = connection.Pipe(duplex=False)
493
494 # spawn process which runs a server
495 self._process = Process(
496 target=type(self)._run_server,
497 args=(self._registry, self._address, self._authkey,
498 self._serializer, writer),
499 )
500 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000501 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000502 self._process.start()
503
504 # get address of server
505 writer.close()
506 self._address = reader.recv()
507 reader.close()
508
509 # register a finalizer
510 self._state.value = State.STARTED
511 self.shutdown = util.Finalize(
512 self, type(self)._finalize_manager,
513 args=(self._process, self._address, self._authkey,
514 self._state, self._Client),
515 exitpriority=0
516 )
517
518 @classmethod
519 def _run_server(cls, registry, address, authkey, serializer, writer):
520 '''
521 Create a server, report its address and run it
522 '''
523 # create server
524 server = cls._Server(registry, address, authkey, serializer)
525
526 # inform parent process of the server's address
527 writer.send(server.address)
528 writer.close()
529
530 # run the manager
531 util.info('manager serving at %r', server.address)
532 server.serve_forever()
533
534 def _create(self, typeid, *args, **kwds):
535 '''
536 Create a new shared object; return the token and exposed tuple
537 '''
538 assert self._state.value == State.STARTED, 'server not yet started'
539 conn = self._Client(self._address, authkey=self._authkey)
540 try:
541 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
542 finally:
543 conn.close()
544 return Token(typeid, self._address, id), exposed
545
546 def join(self, timeout=None):
547 '''
548 Join the manager process (if it has been spawned)
549 '''
550 self._process.join(timeout)
551
552 def _debug_info(self):
553 '''
554 Return some info about the servers shared objects and connections
555 '''
556 conn = self._Client(self._address, authkey=self._authkey)
557 try:
558 return dispatch(conn, None, 'debug_info')
559 finally:
560 conn.close()
561
562 def _number_of_objects(self):
563 '''
564 Return the number of shared objects
565 '''
566 conn = self._Client(self._address, authkey=self._authkey)
567 try:
568 return dispatch(conn, None, 'number_of_objects')
569 finally:
570 conn.close()
571
572 def __enter__(self):
573 return self
574
575 def __exit__(self, exc_type, exc_val, exc_tb):
576 self.shutdown()
577
578 @staticmethod
579 def _finalize_manager(process, address, authkey, state, _Client):
580 '''
581 Shutdown the manager process; will be registered as a finalizer
582 '''
583 if process.is_alive():
584 util.info('sending shutdown message to manager')
585 try:
586 conn = _Client(address, authkey=authkey)
587 try:
588 dispatch(conn, None, 'shutdown')
589 finally:
590 conn.close()
591 except Exception:
592 pass
593
594 process.join(timeout=0.2)
595 if process.is_alive():
596 util.info('manager still alive')
597 if hasattr(process, 'terminate'):
598 util.info('trying to `terminate()` manager process')
599 process.terminate()
600 process.join(timeout=0.1)
601 if process.is_alive():
602 util.info('manager still alive after terminate')
603
604 state.value = State.SHUTDOWN
605 try:
606 del BaseProxy._address_to_local[address]
607 except KeyError:
608 pass
609
610 address = property(lambda self: self._address)
611
612 @classmethod
613 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
614 method_to_typeid=None, create_method=True):
615 '''
616 Register a typeid with the manager type
617 '''
618 if '_registry' not in cls.__dict__:
619 cls._registry = cls._registry.copy()
620
621 if proxytype is None:
622 proxytype = AutoProxy
623
624 exposed = exposed or getattr(proxytype, '_exposed_', None)
625
626 method_to_typeid = method_to_typeid or \
627 getattr(proxytype, '_method_to_typeid_', None)
628
629 if method_to_typeid:
630 for key, value in list(method_to_typeid.items()):
631 assert type(key) is str, '%r is not a string' % key
632 assert type(value) is str, '%r is not a string' % value
633
634 cls._registry[typeid] = (
635 callable, exposed, method_to_typeid, proxytype
636 )
637
638 if create_method:
639 def temp(self, *args, **kwds):
640 util.debug('requesting creation of a shared %r object', typeid)
641 token, exp = self._create(typeid, *args, **kwds)
642 proxy = proxytype(
643 token, self._serializer, manager=self,
644 authkey=self._authkey, exposed=exp
645 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000646 conn = self._Client(token.address, authkey=self._authkey)
647 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000648 return proxy
649 temp.__name__ = typeid
650 setattr(cls, typeid, temp)
651
652#
653# Subclass of set which get cleared after a fork
654#
655
656class ProcessLocalSet(set):
657 def __init__(self):
658 util.register_after_fork(self, lambda obj: obj.clear())
659 def __reduce__(self):
660 return type(self), ()
661
662#
663# Definition of BaseProxy
664#
665
666class BaseProxy(object):
667 '''
668 A base for proxies of shared objects
669 '''
670 _address_to_local = {}
671 _mutex = util.ForkAwareThreadLock()
672
673 def __init__(self, token, serializer, manager=None,
674 authkey=None, exposed=None, incref=True):
675 BaseProxy._mutex.acquire()
676 try:
677 tls_idset = BaseProxy._address_to_local.get(token.address, None)
678 if tls_idset is None:
679 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
680 BaseProxy._address_to_local[token.address] = tls_idset
681 finally:
682 BaseProxy._mutex.release()
683
684 # self._tls is used to record the connection used by this
685 # thread to communicate with the manager at token.address
686 self._tls = tls_idset[0]
687
688 # self._idset is used to record the identities of all shared
689 # objects for which the current process owns references and
690 # which are in the manager at token.address
691 self._idset = tls_idset[1]
692
693 self._token = token
694 self._id = self._token.id
695 self._manager = manager
696 self._serializer = serializer
697 self._Client = listener_client[serializer][1]
698
699 if authkey is not None:
700 self._authkey = AuthenticationString(authkey)
701 elif self._manager is not None:
702 self._authkey = self._manager._authkey
703 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000704 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000705
706 if incref:
707 self._incref()
708
709 util.register_after_fork(self, BaseProxy._after_fork)
710
711 def _connect(self):
712 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000713 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000714 if threading.current_thread().name != 'MainThread':
715 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000716 conn = self._Client(self._token.address, authkey=self._authkey)
717 dispatch(conn, None, 'accept_connection', (name,))
718 self._tls.connection = conn
719
720 def _callmethod(self, methodname, args=(), kwds={}):
721 '''
722 Try to call a method of the referrent and return a copy of the result
723 '''
724 try:
725 conn = self._tls.connection
726 except AttributeError:
727 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000728 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000729 self._connect()
730 conn = self._tls.connection
731
732 conn.send((self._id, methodname, args, kwds))
733 kind, result = conn.recv()
734
735 if kind == '#RETURN':
736 return result
737 elif kind == '#PROXY':
738 exposed, token = result
739 proxytype = self._manager._registry[token.typeid][-1]
Jesse Noller824f4f32008-09-02 19:12:20 +0000740 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000741 token, self._serializer, manager=self._manager,
742 authkey=self._authkey, exposed=exposed
743 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000744 conn = self._Client(token.address, authkey=self._authkey)
745 dispatch(conn, None, 'decref', (token.id,))
746 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000747 raise convert_to_error(kind, result)
748
749 def _getvalue(self):
750 '''
751 Get a copy of the value of the referent
752 '''
753 return self._callmethod('#GETVALUE')
754
755 def _incref(self):
756 conn = self._Client(self._token.address, authkey=self._authkey)
757 dispatch(conn, None, 'incref', (self._id,))
758 util.debug('INCREF %r', self._token.id)
759
760 self._idset.add(self._id)
761
762 state = self._manager and self._manager._state
763
764 self._close = util.Finalize(
765 self, BaseProxy._decref,
766 args=(self._token, self._authkey, state,
767 self._tls, self._idset, self._Client),
768 exitpriority=10
769 )
770
771 @staticmethod
772 def _decref(token, authkey, state, tls, idset, _Client):
773 idset.discard(token.id)
774
775 # check whether manager is still alive
776 if state is None or state.value == State.STARTED:
777 # tell manager this process no longer cares about referent
778 try:
779 util.debug('DECREF %r', token.id)
780 conn = _Client(token.address, authkey=authkey)
781 dispatch(conn, None, 'decref', (token.id,))
782 except Exception as e:
783 util.debug('... decref failed %s', e)
784
785 else:
786 util.debug('DECREF %r -- manager already shutdown', token.id)
787
788 # check whether we can close this thread's connection because
789 # the process owns no more references to objects for this manager
790 if not idset and hasattr(tls, 'connection'):
791 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000792 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 tls.connection.close()
794 del tls.connection
795
796 def _after_fork(self):
797 self._manager = None
798 try:
799 self._incref()
800 except Exception as e:
801 # the proxy may just be for a manager which has shutdown
802 util.info('incref failed: %s' % e)
803
804 def __reduce__(self):
805 kwds = {}
806 if Popen.thread_is_spawning():
807 kwds['authkey'] = self._authkey
808
809 if getattr(self, '_isauto', False):
810 kwds['exposed'] = self._exposed_
811 return (RebuildProxy,
812 (AutoProxy, self._token, self._serializer, kwds))
813 else:
814 return (RebuildProxy,
815 (type(self), self._token, self._serializer, kwds))
816
817 def __deepcopy__(self, memo):
818 return self._getvalue()
819
820 def __repr__(self):
821 return '<%s object, typeid %r at %s>' % \
822 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
823
824 def __str__(self):
825 '''
826 Return representation of the referent (or a fall-back if that fails)
827 '''
828 try:
829 return self._callmethod('__repr__')
830 except Exception:
831 return repr(self)[:-1] + "; '__str__()' failed>"
832
833#
834# Function used for unpickling
835#
836
837def RebuildProxy(func, token, serializer, kwds):
838 '''
839 Function used for unpickling proxy objects.
840
841 If possible the shared object is returned, or otherwise a proxy for it.
842 '''
843 server = getattr(current_process(), '_manager_server', None)
844
845 if server and server.address == token.address:
846 return server.id_to_obj[token.id][0]
847 else:
848 incref = (
849 kwds.pop('incref', True) and
850 not getattr(current_process(), '_inheriting', False)
851 )
852 return func(token, serializer, incref=incref, **kwds)
853
854#
855# Functions to create proxies and proxy types
856#
857
858def MakeProxyType(name, exposed, _cache={}):
859 '''
860 Return an proxy type whose methods are given by `exposed`
861 '''
862 exposed = tuple(exposed)
863 try:
864 return _cache[(name, exposed)]
865 except KeyError:
866 pass
867
868 dic = {}
869
870 for meth in exposed:
871 exec('''def %s(self, *args, **kwds):
872 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
873
874 ProxyType = type(name, (BaseProxy,), dic)
875 ProxyType._exposed_ = exposed
876 _cache[(name, exposed)] = ProxyType
877 return ProxyType
878
879
880def AutoProxy(token, serializer, manager=None, authkey=None,
881 exposed=None, incref=True):
882 '''
883 Return an auto-proxy for `token`
884 '''
885 _Client = listener_client[serializer][1]
886
887 if exposed is None:
888 conn = _Client(token.address, authkey=authkey)
889 try:
890 exposed = dispatch(conn, None, 'get_methods', (token,))
891 finally:
892 conn.close()
893
894 if authkey is None and manager is not None:
895 authkey = manager._authkey
896 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000897 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898
899 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
900 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
901 incref=incref)
902 proxy._isauto = True
903 return proxy
904
905#
906# Types/callables which we will register with SyncManager
907#
908
909class Namespace(object):
910 def __init__(self, **kwds):
911 self.__dict__.update(kwds)
912 def __repr__(self):
913 items = list(self.__dict__.items())
914 temp = []
915 for name, value in items:
916 if not name.startswith('_'):
917 temp.append('%s=%r' % (name, value))
918 temp.sort()
919 return 'Namespace(%s)' % str.join(', ', temp)
920
921class Value(object):
922 def __init__(self, typecode, value, lock=True):
923 self._typecode = typecode
924 self._value = value
925 def get(self):
926 return self._value
927 def set(self, value):
928 self._value = value
929 def __repr__(self):
930 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
931 value = property(get, set)
932
933def Array(typecode, sequence, lock=True):
934 return array.array(typecode, sequence)
935
936#
937# Proxy types used by SyncManager
938#
939
940class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000941 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000942 def __iter__(self):
943 return self
944 def __next__(self, *args):
945 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946 def send(self, *args):
947 return self._callmethod('send', args)
948 def throw(self, *args):
949 return self._callmethod('throw', args)
950 def close(self, *args):
951 return self._callmethod('close', args)
952
953
954class AcquirerProxy(BaseProxy):
955 _exposed_ = ('acquire', 'release')
956 def acquire(self, blocking=True):
957 return self._callmethod('acquire', (blocking,))
958 def release(self):
959 return self._callmethod('release')
960 def __enter__(self):
961 return self._callmethod('acquire')
962 def __exit__(self, exc_type, exc_val, exc_tb):
963 return self._callmethod('release')
964
965
966class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000967 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000968 def wait(self, timeout=None):
969 return self._callmethod('wait', (timeout,))
970 def notify(self):
971 return self._callmethod('notify')
972 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000973 return self._callmethod('notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000974
975class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000976 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000977 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000978 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000979 def set(self):
980 return self._callmethod('set')
981 def clear(self):
982 return self._callmethod('clear')
983 def wait(self, timeout=None):
984 return self._callmethod('wait', (timeout,))
985
986class NamespaceProxy(BaseProxy):
987 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
988 def __getattr__(self, key):
989 if key[0] == '_':
990 return object.__getattribute__(self, key)
991 callmethod = object.__getattribute__(self, '_callmethod')
992 return callmethod('__getattribute__', (key,))
993 def __setattr__(self, key, value):
994 if key[0] == '_':
995 return object.__setattr__(self, key, value)
996 callmethod = object.__getattribute__(self, '_callmethod')
997 return callmethod('__setattr__', (key, value))
998 def __delattr__(self, key):
999 if key[0] == '_':
1000 return object.__delattr__(self, key)
1001 callmethod = object.__getattribute__(self, '_callmethod')
1002 return callmethod('__delattr__', (key,))
1003
1004
1005class ValueProxy(BaseProxy):
1006 _exposed_ = ('get', 'set')
1007 def get(self):
1008 return self._callmethod('get')
1009 def set(self, value):
1010 return self._callmethod('set', (value,))
1011 value = property(get, set)
1012
1013
1014BaseListProxy = MakeProxyType('BaseListProxy', (
1015 '__add__', '__contains__', '__delitem__', '__delslice__',
1016 '__getitem__', '__getslice__', '__len__', '__mul__',
1017 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1018 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1019 'reverse', 'sort', '__imul__'
1020 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1021class ListProxy(BaseListProxy):
1022 def __iadd__(self, value):
1023 self._callmethod('extend', (value,))
1024 return self
1025 def __imul__(self, value):
1026 self._callmethod('__imul__', (value,))
1027 return self
1028
1029
1030DictProxy = MakeProxyType('DictProxy', (
1031 '__contains__', '__delitem__', '__getitem__', '__len__',
1032 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1033 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1034 ))
1035
1036
1037ArrayProxy = MakeProxyType('ArrayProxy', (
1038 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1039 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1040
1041
1042PoolProxy = MakeProxyType('PoolProxy', (
1043 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1044 'map', 'map_async', 'terminate'
1045 ))
1046PoolProxy._method_to_typeid_ = {
1047 'apply_async': 'AsyncResult',
1048 'map_async': 'AsyncResult',
1049 'imap': 'Iterator',
1050 'imap_unordered': 'Iterator'
1051 }
1052
1053#
1054# Definition of SyncManager
1055#
1056
1057class SyncManager(BaseManager):
1058 '''
1059 Subclass of `BaseManager` which supports a number of shared object types.
1060
1061 The types registered are those intended for the synchronization
1062 of threads, plus `dict`, `list` and `Namespace`.
1063
1064 The `multiprocessing.Manager()` function creates started instances of
1065 this class.
1066 '''
1067
1068SyncManager.register('Queue', queue.Queue)
1069SyncManager.register('JoinableQueue', queue.Queue)
1070SyncManager.register('Event', threading.Event, EventProxy)
1071SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1072SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1073SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1074SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1075 AcquirerProxy)
1076SyncManager.register('Condition', threading.Condition, ConditionProxy)
1077SyncManager.register('Pool', Pool, PoolProxy)
1078SyncManager.register('list', list, ListProxy)
1079SyncManager.register('dict', dict, DictProxy)
1080SyncManager.register('Value', Value, ValueProxy)
1081SyncManager.register('Array', Array, ArrayProxy)
1082SyncManager.register('Namespace', Namespace, NamespaceProxy)
1083
1084# types returned by methods of PoolProxy
1085SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1086SyncManager.register('AsyncResult', create_method=False)