blob: 02e96b969b46087acc15161df81bbcb847d06a61 [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8#
9
10__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11
12#
13# Imports
14#
15
16import os
17import sys
18import weakref
19import threading
20import array
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000021import Queue
22
23from traceback import format_exc
24from multiprocessing import Process, current_process, active_children, Pool, util, connection
25from multiprocessing.process import AuthenticationString
Jesse Noller13e9d582008-07-16 14:32:36 +000026from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000027from multiprocessing.util import Finalize, info
28
29try:
30 from cPickle import PicklingError
31except ImportError:
32 from pickle import PicklingError
33
34#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000035# Register some things for pickling
36#
37
38def reduce_array(a):
39 return array.array, (a.typecode, a.tostring())
Jesse Noller13e9d582008-07-16 14:32:36 +000040ForkingPickler.register(array.array, reduce_array)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000041
42view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000043
44#
45# Type for identifying shared objects
46#
47
48class Token(object):
49 '''
50 Type to uniquely indentify a shared object
51 '''
52 __slots__ = ('typeid', 'address', 'id')
53
54 def __init__(self, typeid, address, id):
55 (self.typeid, self.address, self.id) = (typeid, address, id)
56
57 def __getstate__(self):
58 return (self.typeid, self.address, self.id)
59
60 def __setstate__(self, state):
61 (self.typeid, self.address, self.id) = state
62
63 def __repr__(self):
64 return 'Token(typeid=%r, address=%r, id=%r)' % \
65 (self.typeid, self.address, self.id)
66
67#
68# Function for communication with a manager's server process
69#
70
71def dispatch(c, id, methodname, args=(), kwds={}):
72 '''
73 Send a message to manager using connection `c` and return response
74 '''
75 c.send((id, methodname, args, kwds))
76 kind, result = c.recv()
77 if kind == '#RETURN':
78 return result
79 raise convert_to_error(kind, result)
80
81def convert_to_error(kind, result):
82 if kind == '#ERROR':
83 return result
84 elif kind == '#TRACEBACK':
85 assert type(result) is str
86 return RemoteError(result)
87 elif kind == '#UNSERIALIZABLE':
88 assert type(result) is str
89 return RemoteError('Unserializable message: %s\n' % result)
90 else:
91 return ValueError('Unrecognized message type')
92
93class RemoteError(Exception):
94 def __str__(self):
95 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
96
97#
98# Functions for finding the method names of an object
99#
100
101def all_methods(obj):
102 '''
103 Return a list of names of methods of `obj`
104 '''
105 temp = []
106 for name in dir(obj):
107 func = getattr(obj, name)
108 if hasattr(func, '__call__'):
109 temp.append(name)
110 return temp
111
112def public_methods(obj):
113 '''
114 Return a list of names of methods of `obj` which do not start with '_'
115 '''
116 return [name for name in all_methods(obj) if name[0] != '_']
117
118#
119# Server which is run in a process controlled by a manager
120#
121
122class Server(object):
123 '''
124 Server class which runs in a process controlled by a manager object
125 '''
126 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
127 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
128
129 def __init__(self, registry, address, authkey, serializer):
130 assert isinstance(authkey, bytes)
131 self.registry = registry
132 self.authkey = AuthenticationString(authkey)
133 Listener, Client = listener_client[serializer]
134
135 # do authentication later
136 self.listener = Listener(address=address, backlog=5)
137 self.address = self.listener.address
138
Jesse Noller7314b382009-01-21 02:08:17 +0000139 self.id_to_obj = {'0': (None, ())}
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000140 self.id_to_refcount = {}
141 self.mutex = threading.RLock()
142 self.stop = 0
143
144 def serve_forever(self):
145 '''
146 Run the server forever
147 '''
148 current_process()._manager_server = self
149 try:
150 try:
151 while 1:
152 try:
153 c = self.listener.accept()
154 except (OSError, IOError):
155 continue
156 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Peterson82aa2012008-08-18 18:31:58 +0000157 t.daemon = True
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000158 t.start()
159 except (KeyboardInterrupt, SystemExit):
160 pass
161 finally:
162 self.stop = 999
163 self.listener.close()
164
165 def handle_request(self, c):
166 '''
167 Handle a new connection
168 '''
169 funcname = result = request = None
170 try:
171 connection.deliver_challenge(c, self.authkey)
172 connection.answer_challenge(c, self.authkey)
173 request = c.recv()
174 ignore, funcname, args, kwds = request
175 assert funcname in self.public, '%r unrecognized' % funcname
176 func = getattr(self, funcname)
177 except Exception:
178 msg = ('#TRACEBACK', format_exc())
179 else:
180 try:
181 result = func(c, *args, **kwds)
182 except Exception:
183 msg = ('#TRACEBACK', format_exc())
184 else:
185 msg = ('#RETURN', result)
186 try:
187 c.send(msg)
188 except Exception, e:
189 try:
190 c.send(('#TRACEBACK', format_exc()))
191 except Exception:
192 pass
193 util.info('Failure to send message: %r', msg)
194 util.info(' ... request was %r', request)
195 util.info(' ... exception was %r', e)
196
197 c.close()
198
199 def serve_client(self, conn):
200 '''
201 Handle requests from the proxies in a particular process/thread
202 '''
203 util.debug('starting server thread to service %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000204 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000205
206 recv = conn.recv
207 send = conn.send
208 id_to_obj = self.id_to_obj
209
210 while not self.stop:
211
212 try:
213 methodname = obj = None
214 request = recv()
215 ident, methodname, args, kwds = request
216 obj, exposed, gettypeid = id_to_obj[ident]
217
218 if methodname not in exposed:
219 raise AttributeError(
220 'method %r of %r object is not in exposed=%r' %
221 (methodname, type(obj), exposed)
222 )
223
224 function = getattr(obj, methodname)
225
226 try:
227 res = function(*args, **kwds)
228 except Exception, e:
229 msg = ('#ERROR', e)
230 else:
231 typeid = gettypeid and gettypeid.get(methodname, None)
232 if typeid:
233 rident, rexposed = self.create(conn, typeid, res)
234 token = Token(typeid, self.address, rident)
235 msg = ('#PROXY', (rexposed, token))
236 else:
237 msg = ('#RETURN', res)
238
239 except AttributeError:
240 if methodname is None:
241 msg = ('#TRACEBACK', format_exc())
242 else:
243 try:
244 fallback_func = self.fallback_mapping[methodname]
245 result = fallback_func(
246 self, conn, ident, obj, *args, **kwds
247 )
248 msg = ('#RETURN', result)
249 except Exception:
250 msg = ('#TRACEBACK', format_exc())
251
252 except EOFError:
253 util.debug('got EOF -- exiting thread serving %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000254 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000255 sys.exit(0)
256
257 except Exception:
258 msg = ('#TRACEBACK', format_exc())
259
260 try:
261 try:
262 send(msg)
263 except Exception, e:
264 send(('#UNSERIALIZABLE', repr(msg)))
265 except Exception, e:
266 util.info('exception in thread serving %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000267 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000268 util.info(' ... message was %r', msg)
269 util.info(' ... exception was %r', e)
270 conn.close()
271 sys.exit(1)
272
273 def fallback_getvalue(self, conn, ident, obj):
274 return obj
275
276 def fallback_str(self, conn, ident, obj):
277 return str(obj)
278
279 def fallback_repr(self, conn, ident, obj):
280 return repr(obj)
281
282 fallback_mapping = {
283 '__str__':fallback_str,
284 '__repr__':fallback_repr,
285 '#GETVALUE':fallback_getvalue
286 }
287
288 def dummy(self, c):
289 pass
290
291 def debug_info(self, c):
292 '''
293 Return some info --- useful to spot problems with refcounting
294 '''
295 self.mutex.acquire()
296 try:
297 result = []
298 keys = self.id_to_obj.keys()
299 keys.sort()
300 for ident in keys:
Jesse Noller7314b382009-01-21 02:08:17 +0000301 if ident != '0':
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000302 result.append(' %s: refcount=%s\n %s' %
303 (ident, self.id_to_refcount[ident],
304 str(self.id_to_obj[ident][0])[:75]))
305 return '\n'.join(result)
306 finally:
307 self.mutex.release()
308
309 def number_of_objects(self, c):
310 '''
311 Number of shared objects
312 '''
Jesse Noller7314b382009-01-21 02:08:17 +0000313 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000314
315 def shutdown(self, c):
316 '''
317 Shutdown this process
318 '''
319 try:
320 try:
321 util.debug('manager received shutdown message')
322 c.send(('#RETURN', None))
323
324 if sys.stdout != sys.__stdout__:
325 util.debug('resetting stdout, stderr')
326 sys.stdout = sys.__stdout__
327 sys.stderr = sys.__stderr__
328
329 util._run_finalizers(0)
330
331 for p in active_children():
332 util.debug('terminating a child process of manager')
333 p.terminate()
334
335 for p in active_children():
336 util.debug('terminating a child process of manager')
337 p.join()
338
339 util._run_finalizers()
340 util.info('manager exiting with exitcode 0')
341 except:
342 import traceback
343 traceback.print_exc()
344 finally:
345 exit(0)
346
347 def create(self, c, typeid, *args, **kwds):
348 '''
349 Create a new shared object and return its id
350 '''
351 self.mutex.acquire()
352 try:
353 callable, exposed, method_to_typeid, proxytype = \
354 self.registry[typeid]
355
356 if callable is None:
357 assert len(args) == 1 and not kwds
358 obj = args[0]
359 else:
360 obj = callable(*args, **kwds)
361
362 if exposed is None:
363 exposed = public_methods(obj)
364 if method_to_typeid is not None:
365 assert type(method_to_typeid) is dict
366 exposed = list(exposed) + list(method_to_typeid)
367
368 ident = '%x' % id(obj) # convert to string because xmlrpclib
369 # only has 32 bit signed integers
370 util.debug('%r callable returned object with id %r', typeid, ident)
371
372 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
373 if ident not in self.id_to_refcount:
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000374 self.id_to_refcount[ident] = 0
375 # increment the reference count immediately, to avoid
376 # this object being garbage collected before a Proxy
377 # object for it can be created. The caller of create()
378 # is responsible for doing a decref once the Proxy object
379 # has been created.
380 self.incref(c, ident)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000381 return ident, tuple(exposed)
382 finally:
383 self.mutex.release()
384
385 def get_methods(self, c, token):
386 '''
387 Return the methods of the shared object indicated by token
388 '''
389 return tuple(self.id_to_obj[token.id][1])
390
391 def accept_connection(self, c, name):
392 '''
393 Spawn a new thread to serve this connection
394 '''
Benjamin Petersona9b22222008-08-18 18:01:43 +0000395 threading.current_thread().name = name
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000396 c.send(('#RETURN', None))
397 self.serve_client(c)
398
399 def incref(self, c, ident):
400 self.mutex.acquire()
401 try:
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000402 self.id_to_refcount[ident] += 1
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000403 finally:
404 self.mutex.release()
405
406 def decref(self, c, ident):
407 self.mutex.acquire()
408 try:
409 assert self.id_to_refcount[ident] >= 1
410 self.id_to_refcount[ident] -= 1
411 if self.id_to_refcount[ident] == 0:
412 del self.id_to_obj[ident], self.id_to_refcount[ident]
413 util.debug('disposing of obj with id %d', ident)
414 finally:
415 self.mutex.release()
416
417#
418# Class to represent state of a manager
419#
420
421class State(object):
422 __slots__ = ['value']
423 INITIAL = 0
424 STARTED = 1
425 SHUTDOWN = 2
426
427#
428# Mapping from serializer name to Listener and Client types
429#
430
431listener_client = {
432 'pickle' : (connection.Listener, connection.Client),
433 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
434 }
435
436#
437# Definition of BaseManager
438#
439
440class BaseManager(object):
441 '''
442 Base class for managers
443 '''
444 _registry = {}
445 _Server = Server
446
447 def __init__(self, address=None, authkey=None, serializer='pickle'):
448 if authkey is None:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000449 authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000450 self._address = address # XXX not final address if eg ('', 0)
451 self._authkey = AuthenticationString(authkey)
452 self._state = State()
453 self._state.value = State.INITIAL
454 self._serializer = serializer
455 self._Listener, self._Client = listener_client[serializer]
456
457 def __reduce__(self):
458 return type(self).from_address, \
459 (self._address, self._authkey, self._serializer)
460
461 def get_server(self):
462 '''
463 Return server object with serve_forever() method and address attribute
464 '''
465 assert self._state.value == State.INITIAL
466 return Server(self._registry, self._address,
467 self._authkey, self._serializer)
468
469 def connect(self):
470 '''
471 Connect manager object to the server process
472 '''
473 Listener, Client = listener_client[self._serializer]
474 conn = Client(self._address, authkey=self._authkey)
475 dispatch(conn, None, 'dummy')
476 self._state.value = State.STARTED
477
478 def start(self):
479 '''
480 Spawn a server process for this manager object
481 '''
482 assert self._state.value == State.INITIAL
483
484 # pipe over which we will retrieve address of server
485 reader, writer = connection.Pipe(duplex=False)
486
487 # spawn process which runs a server
488 self._process = Process(
489 target=type(self)._run_server,
490 args=(self._registry, self._address, self._authkey,
491 self._serializer, writer),
492 )
493 ident = ':'.join(str(i) for i in self._process._identity)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000494 self._process.name = type(self).__name__ + '-' + ident
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000495 self._process.start()
496
497 # get address of server
498 writer.close()
499 self._address = reader.recv()
500 reader.close()
501
502 # register a finalizer
503 self._state.value = State.STARTED
504 self.shutdown = util.Finalize(
505 self, type(self)._finalize_manager,
506 args=(self._process, self._address, self._authkey,
507 self._state, self._Client),
508 exitpriority=0
509 )
510
511 @classmethod
512 def _run_server(cls, registry, address, authkey, serializer, writer):
513 '''
514 Create a server, report its address and run it
515 '''
516 # create server
517 server = cls._Server(registry, address, authkey, serializer)
518
519 # inform parent process of the server's address
520 writer.send(server.address)
521 writer.close()
522
523 # run the manager
524 util.info('manager serving at %r', server.address)
525 server.serve_forever()
526
527 def _create(self, typeid, *args, **kwds):
528 '''
529 Create a new shared object; return the token and exposed tuple
530 '''
531 assert self._state.value == State.STARTED, 'server not yet started'
532 conn = self._Client(self._address, authkey=self._authkey)
533 try:
534 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
535 finally:
536 conn.close()
537 return Token(typeid, self._address, id), exposed
538
539 def join(self, timeout=None):
540 '''
541 Join the manager process (if it has been spawned)
542 '''
543 self._process.join(timeout)
544
545 def _debug_info(self):
546 '''
547 Return some info about the servers shared objects and connections
548 '''
549 conn = self._Client(self._address, authkey=self._authkey)
550 try:
551 return dispatch(conn, None, 'debug_info')
552 finally:
553 conn.close()
554
555 def _number_of_objects(self):
556 '''
557 Return the number of shared objects
558 '''
559 conn = self._Client(self._address, authkey=self._authkey)
560 try:
561 return dispatch(conn, None, 'number_of_objects')
562 finally:
563 conn.close()
564
565 def __enter__(self):
566 return self
567
568 def __exit__(self, exc_type, exc_val, exc_tb):
569 self.shutdown()
570
571 @staticmethod
572 def _finalize_manager(process, address, authkey, state, _Client):
573 '''
574 Shutdown the manager process; will be registered as a finalizer
575 '''
576 if process.is_alive():
577 util.info('sending shutdown message to manager')
578 try:
579 conn = _Client(address, authkey=authkey)
580 try:
581 dispatch(conn, None, 'shutdown')
582 finally:
583 conn.close()
584 except Exception:
585 pass
586
587 process.join(timeout=0.2)
588 if process.is_alive():
589 util.info('manager still alive')
590 if hasattr(process, 'terminate'):
591 util.info('trying to `terminate()` manager process')
592 process.terminate()
593 process.join(timeout=0.1)
594 if process.is_alive():
595 util.info('manager still alive after terminate')
596
597 state.value = State.SHUTDOWN
598 try:
599 del BaseProxy._address_to_local[address]
600 except KeyError:
601 pass
602
603 address = property(lambda self: self._address)
604
605 @classmethod
606 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
607 method_to_typeid=None, create_method=True):
608 '''
609 Register a typeid with the manager type
610 '''
611 if '_registry' not in cls.__dict__:
612 cls._registry = cls._registry.copy()
613
614 if proxytype is None:
615 proxytype = AutoProxy
616
617 exposed = exposed or getattr(proxytype, '_exposed_', None)
618
619 method_to_typeid = method_to_typeid or \
620 getattr(proxytype, '_method_to_typeid_', None)
621
622 if method_to_typeid:
623 for key, value in method_to_typeid.items():
624 assert type(key) is str, '%r is not a string' % key
625 assert type(value) is str, '%r is not a string' % value
626
627 cls._registry[typeid] = (
628 callable, exposed, method_to_typeid, proxytype
629 )
630
631 if create_method:
632 def temp(self, *args, **kwds):
633 util.debug('requesting creation of a shared %r object', typeid)
634 token, exp = self._create(typeid, *args, **kwds)
635 proxy = proxytype(
636 token, self._serializer, manager=self,
637 authkey=self._authkey, exposed=exp
638 )
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000639 conn = self._Client(token.address, authkey=self._authkey)
640 dispatch(conn, None, 'decref', (token.id,))
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000641 return proxy
642 temp.__name__ = typeid
643 setattr(cls, typeid, temp)
644
645#
646# Subclass of set which get cleared after a fork
647#
648
649class ProcessLocalSet(set):
650 def __init__(self):
651 util.register_after_fork(self, lambda obj: obj.clear())
652 def __reduce__(self):
653 return type(self), ()
654
655#
656# Definition of BaseProxy
657#
658
659class BaseProxy(object):
660 '''
661 A base for proxies of shared objects
662 '''
663 _address_to_local = {}
664 _mutex = util.ForkAwareThreadLock()
665
666 def __init__(self, token, serializer, manager=None,
667 authkey=None, exposed=None, incref=True):
668 BaseProxy._mutex.acquire()
669 try:
670 tls_idset = BaseProxy._address_to_local.get(token.address, None)
671 if tls_idset is None:
672 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
673 BaseProxy._address_to_local[token.address] = tls_idset
674 finally:
675 BaseProxy._mutex.release()
676
677 # self._tls is used to record the connection used by this
678 # thread to communicate with the manager at token.address
679 self._tls = tls_idset[0]
680
681 # self._idset is used to record the identities of all shared
682 # objects for which the current process owns references and
683 # which are in the manager at token.address
684 self._idset = tls_idset[1]
685
686 self._token = token
687 self._id = self._token.id
688 self._manager = manager
689 self._serializer = serializer
690 self._Client = listener_client[serializer][1]
691
692 if authkey is not None:
693 self._authkey = AuthenticationString(authkey)
694 elif self._manager is not None:
695 self._authkey = self._manager._authkey
696 else:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000697 self._authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000698
699 if incref:
700 self._incref()
701
702 util.register_after_fork(self, BaseProxy._after_fork)
703
704 def _connect(self):
705 util.debug('making connection to manager')
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000706 name = current_process().name
Benjamin Petersona9b22222008-08-18 18:01:43 +0000707 if threading.current_thread().name != 'MainThread':
708 name += '|' + threading.current_thread().name
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000709 conn = self._Client(self._token.address, authkey=self._authkey)
710 dispatch(conn, None, 'accept_connection', (name,))
711 self._tls.connection = conn
712
713 def _callmethod(self, methodname, args=(), kwds={}):
714 '''
715 Try to call a method of the referrent and return a copy of the result
716 '''
717 try:
718 conn = self._tls.connection
719 except AttributeError:
720 util.debug('thread %r does not own a connection',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000721 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000722 self._connect()
723 conn = self._tls.connection
724
725 conn.send((self._id, methodname, args, kwds))
726 kind, result = conn.recv()
727
728 if kind == '#RETURN':
729 return result
730 elif kind == '#PROXY':
731 exposed, token = result
732 proxytype = self._manager._registry[token.typeid][-1]
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000733 proxy = proxytype(
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000734 token, self._serializer, manager=self._manager,
735 authkey=self._authkey, exposed=exposed
736 )
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000737 conn = self._Client(token.address, authkey=self._authkey)
738 dispatch(conn, None, 'decref', (token.id,))
739 return proxy
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000740 raise convert_to_error(kind, result)
741
742 def _getvalue(self):
743 '''
744 Get a copy of the value of the referent
745 '''
746 return self._callmethod('#GETVALUE')
747
748 def _incref(self):
749 conn = self._Client(self._token.address, authkey=self._authkey)
750 dispatch(conn, None, 'incref', (self._id,))
751 util.debug('INCREF %r', self._token.id)
752
753 self._idset.add(self._id)
754
755 state = self._manager and self._manager._state
756
757 self._close = util.Finalize(
758 self, BaseProxy._decref,
759 args=(self._token, self._authkey, state,
760 self._tls, self._idset, self._Client),
761 exitpriority=10
762 )
763
764 @staticmethod
765 def _decref(token, authkey, state, tls, idset, _Client):
766 idset.discard(token.id)
767
768 # check whether manager is still alive
769 if state is None or state.value == State.STARTED:
770 # tell manager this process no longer cares about referent
771 try:
772 util.debug('DECREF %r', token.id)
773 conn = _Client(token.address, authkey=authkey)
774 dispatch(conn, None, 'decref', (token.id,))
775 except Exception, e:
776 util.debug('... decref failed %s', e)
777
778 else:
779 util.debug('DECREF %r -- manager already shutdown', token.id)
780
781 # check whether we can close this thread's connection because
782 # the process owns no more references to objects for this manager
783 if not idset and hasattr(tls, 'connection'):
784 util.debug('thread %r has no more proxies so closing conn',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000785 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000786 tls.connection.close()
787 del tls.connection
788
789 def _after_fork(self):
790 self._manager = None
791 try:
792 self._incref()
793 except Exception, e:
794 # the proxy may just be for a manager which has shutdown
795 util.info('incref failed: %s' % e)
796
797 def __reduce__(self):
798 kwds = {}
799 if Popen.thread_is_spawning():
800 kwds['authkey'] = self._authkey
801
802 if getattr(self, '_isauto', False):
803 kwds['exposed'] = self._exposed_
804 return (RebuildProxy,
805 (AutoProxy, self._token, self._serializer, kwds))
806 else:
807 return (RebuildProxy,
808 (type(self), self._token, self._serializer, kwds))
809
810 def __deepcopy__(self, memo):
811 return self._getvalue()
812
813 def __repr__(self):
814 return '<%s object, typeid %r at %s>' % \
815 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
816
817 def __str__(self):
818 '''
819 Return representation of the referent (or a fall-back if that fails)
820 '''
821 try:
822 return self._callmethod('__repr__')
823 except Exception:
824 return repr(self)[:-1] + "; '__str__()' failed>"
825
826#
827# Function used for unpickling
828#
829
830def RebuildProxy(func, token, serializer, kwds):
831 '''
832 Function used for unpickling proxy objects.
833
834 If possible the shared object is returned, or otherwise a proxy for it.
835 '''
836 server = getattr(current_process(), '_manager_server', None)
837
838 if server and server.address == token.address:
839 return server.id_to_obj[token.id][0]
840 else:
841 incref = (
842 kwds.pop('incref', True) and
843 not getattr(current_process(), '_inheriting', False)
844 )
845 return func(token, serializer, incref=incref, **kwds)
846
847#
848# Functions to create proxies and proxy types
849#
850
851def MakeProxyType(name, exposed, _cache={}):
852 '''
853 Return an proxy type whose methods are given by `exposed`
854 '''
855 exposed = tuple(exposed)
856 try:
857 return _cache[(name, exposed)]
858 except KeyError:
859 pass
860
861 dic = {}
862
863 for meth in exposed:
864 exec '''def %s(self, *args, **kwds):
865 return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
866
867 ProxyType = type(name, (BaseProxy,), dic)
868 ProxyType._exposed_ = exposed
869 _cache[(name, exposed)] = ProxyType
870 return ProxyType
871
872
873def AutoProxy(token, serializer, manager=None, authkey=None,
874 exposed=None, incref=True):
875 '''
876 Return an auto-proxy for `token`
877 '''
878 _Client = listener_client[serializer][1]
879
880 if exposed is None:
881 conn = _Client(token.address, authkey=authkey)
882 try:
883 exposed = dispatch(conn, None, 'get_methods', (token,))
884 finally:
885 conn.close()
886
887 if authkey is None and manager is not None:
888 authkey = manager._authkey
889 if authkey is None:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000890 authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000891
892 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
893 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
894 incref=incref)
895 proxy._isauto = True
896 return proxy
897
898#
899# Types/callables which we will register with SyncManager
900#
901
902class Namespace(object):
903 def __init__(self, **kwds):
904 self.__dict__.update(kwds)
905 def __repr__(self):
906 items = self.__dict__.items()
907 temp = []
908 for name, value in items:
909 if not name.startswith('_'):
910 temp.append('%s=%r' % (name, value))
911 temp.sort()
912 return 'Namespace(%s)' % str.join(', ', temp)
913
914class Value(object):
915 def __init__(self, typecode, value, lock=True):
916 self._typecode = typecode
917 self._value = value
918 def get(self):
919 return self._value
920 def set(self, value):
921 self._value = value
922 def __repr__(self):
923 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
924 value = property(get, set)
925
926def Array(typecode, sequence, lock=True):
927 return array.array(typecode, sequence)
928
929#
930# Proxy types used by SyncManager
931#
932
933class IteratorProxy(BaseProxy):
934 # XXX remove methods for Py3.0 and Py2.6
935 _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
936 def __iter__(self):
937 return self
938 def __next__(self, *args):
939 return self._callmethod('__next__', args)
940 def next(self, *args):
941 return self._callmethod('next', args)
942 def send(self, *args):
943 return self._callmethod('send', args)
944 def throw(self, *args):
945 return self._callmethod('throw', args)
946 def close(self, *args):
947 return self._callmethod('close', args)
948
949
950class AcquirerProxy(BaseProxy):
951 _exposed_ = ('acquire', 'release')
952 def acquire(self, blocking=True):
953 return self._callmethod('acquire', (blocking,))
954 def release(self):
955 return self._callmethod('release')
956 def __enter__(self):
957 return self._callmethod('acquire')
958 def __exit__(self, exc_type, exc_val, exc_tb):
959 return self._callmethod('release')
960
961
962class ConditionProxy(AcquirerProxy):
963 # XXX will Condition.notfyAll() name be available in Py3.0?
964 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
965 def wait(self, timeout=None):
966 return self._callmethod('wait', (timeout,))
967 def notify(self):
968 return self._callmethod('notify')
969 def notify_all(self):
970 return self._callmethod('notify_all')
971
972class EventProxy(BaseProxy):
Benjamin Peterson80821f72008-06-26 21:29:19 +0000973 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000974 def is_set(self):
Benjamin Peterson0adfd932008-06-26 21:24:35 +0000975 return self._callmethod('is_set')
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000976 def set(self):
977 return self._callmethod('set')
978 def clear(self):
979 return self._callmethod('clear')
980 def wait(self, timeout=None):
981 return self._callmethod('wait', (timeout,))
982
983class NamespaceProxy(BaseProxy):
984 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
985 def __getattr__(self, key):
986 if key[0] == '_':
987 return object.__getattribute__(self, key)
988 callmethod = object.__getattribute__(self, '_callmethod')
989 return callmethod('__getattribute__', (key,))
990 def __setattr__(self, key, value):
991 if key[0] == '_':
992 return object.__setattr__(self, key, value)
993 callmethod = object.__getattribute__(self, '_callmethod')
994 return callmethod('__setattr__', (key, value))
995 def __delattr__(self, key):
996 if key[0] == '_':
997 return object.__delattr__(self, key)
998 callmethod = object.__getattribute__(self, '_callmethod')
999 return callmethod('__delattr__', (key,))
1000
1001
1002class ValueProxy(BaseProxy):
1003 _exposed_ = ('get', 'set')
1004 def get(self):
1005 return self._callmethod('get')
1006 def set(self, value):
1007 return self._callmethod('set', (value,))
1008 value = property(get, set)
1009
1010
1011BaseListProxy = MakeProxyType('BaseListProxy', (
1012 '__add__', '__contains__', '__delitem__', '__delslice__',
1013 '__getitem__', '__getslice__', '__len__', '__mul__',
1014 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1015 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1016 'reverse', 'sort', '__imul__'
1017 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1018class ListProxy(BaseListProxy):
1019 def __iadd__(self, value):
1020 self._callmethod('extend', (value,))
1021 return self
1022 def __imul__(self, value):
1023 self._callmethod('__imul__', (value,))
1024 return self
1025
1026
1027DictProxy = MakeProxyType('DictProxy', (
1028 '__contains__', '__delitem__', '__getitem__', '__len__',
1029 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1030 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1031 ))
1032
1033
1034ArrayProxy = MakeProxyType('ArrayProxy', (
1035 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1036 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1037
1038
1039PoolProxy = MakeProxyType('PoolProxy', (
1040 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1041 'map', 'map_async', 'terminate'
1042 ))
1043PoolProxy._method_to_typeid_ = {
1044 'apply_async': 'AsyncResult',
1045 'map_async': 'AsyncResult',
1046 'imap': 'Iterator',
1047 'imap_unordered': 'Iterator'
1048 }
1049
1050#
1051# Definition of SyncManager
1052#
1053
1054class SyncManager(BaseManager):
1055 '''
1056 Subclass of `BaseManager` which supports a number of shared object types.
1057
1058 The types registered are those intended for the synchronization
1059 of threads, plus `dict`, `list` and `Namespace`.
1060
1061 The `multiprocessing.Manager()` function creates started instances of
1062 this class.
1063 '''
1064
1065SyncManager.register('Queue', Queue.Queue)
1066SyncManager.register('JoinableQueue', Queue.Queue)
1067SyncManager.register('Event', threading.Event, EventProxy)
1068SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1069SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1070SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1071SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1072 AcquirerProxy)
1073SyncManager.register('Condition', threading.Condition, ConditionProxy)
1074SyncManager.register('Pool', Pool, PoolProxy)
1075SyncManager.register('list', list, ListProxy)
1076SyncManager.register('dict', dict, DictProxy)
1077SyncManager.register('Value', Value, ValueProxy)
1078SyncManager.register('Array', Array, ArrayProxy)
1079SyncManager.register('Namespace', Namespace, NamespaceProxy)
1080
1081# types returned by methods of PoolProxy
1082SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1083SyncManager.register('AsyncResult', create_method=False)