blob: 4af2107743b6246399ccce786d062a3829f3950c [file] [log] [blame]
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8#
9
10__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11
12#
13# Imports
14#
15
16import os
17import sys
18import weakref
19import threading
20import array
21import copy_reg
22import Queue
23
24from traceback import format_exc
25from multiprocessing import Process, current_process, active_children, Pool, util, connection
26from multiprocessing.process import AuthenticationString
27from multiprocessing.forking import exit, Popen, assert_spawning
28from multiprocessing.util import Finalize, info
29
30try:
31 from cPickle import PicklingError
32except ImportError:
33 from pickle import PicklingError
34
35#
36#
37#
38
39try:
40 bytes
41except NameError:
42 bytes = str # XXX not needed in Py2.6 and Py3.0
43
44#
45# Register some things for pickling
46#
47
48def reduce_array(a):
49 return array.array, (a.typecode, a.tostring())
50copy_reg.pickle(array.array, reduce_array)
51
52view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
53if view_types[0] is not list: # XXX only needed in Py3.0
54 def rebuild_as_list(obj):
55 return list, (list(obj),)
56 for view_type in view_types:
57 copy_reg.pickle(view_type, rebuild_as_list)
58
59#
60# Type for identifying shared objects
61#
62
63class Token(object):
64 '''
65 Type to uniquely indentify a shared object
66 '''
67 __slots__ = ('typeid', 'address', 'id')
68
69 def __init__(self, typeid, address, id):
70 (self.typeid, self.address, self.id) = (typeid, address, id)
71
72 def __getstate__(self):
73 return (self.typeid, self.address, self.id)
74
75 def __setstate__(self, state):
76 (self.typeid, self.address, self.id) = state
77
78 def __repr__(self):
79 return 'Token(typeid=%r, address=%r, id=%r)' % \
80 (self.typeid, self.address, self.id)
81
82#
83# Function for communication with a manager's server process
84#
85
86def dispatch(c, id, methodname, args=(), kwds={}):
87 '''
88 Send a message to manager using connection `c` and return response
89 '''
90 c.send((id, methodname, args, kwds))
91 kind, result = c.recv()
92 if kind == '#RETURN':
93 return result
94 raise convert_to_error(kind, result)
95
96def convert_to_error(kind, result):
97 if kind == '#ERROR':
98 return result
99 elif kind == '#TRACEBACK':
100 assert type(result) is str
101 return RemoteError(result)
102 elif kind == '#UNSERIALIZABLE':
103 assert type(result) is str
104 return RemoteError('Unserializable message: %s\n' % result)
105 else:
106 return ValueError('Unrecognized message type')
107
108class RemoteError(Exception):
109 def __str__(self):
110 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
111
112#
113# Functions for finding the method names of an object
114#
115
116def all_methods(obj):
117 '''
118 Return a list of names of methods of `obj`
119 '''
120 temp = []
121 for name in dir(obj):
122 func = getattr(obj, name)
123 if hasattr(func, '__call__'):
124 temp.append(name)
125 return temp
126
127def public_methods(obj):
128 '''
129 Return a list of names of methods of `obj` which do not start with '_'
130 '''
131 return [name for name in all_methods(obj) if name[0] != '_']
132
133#
134# Server which is run in a process controlled by a manager
135#
136
137class Server(object):
138 '''
139 Server class which runs in a process controlled by a manager object
140 '''
141 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
142 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
143
144 def __init__(self, registry, address, authkey, serializer):
145 assert isinstance(authkey, bytes)
146 self.registry = registry
147 self.authkey = AuthenticationString(authkey)
148 Listener, Client = listener_client[serializer]
149
150 # do authentication later
151 self.listener = Listener(address=address, backlog=5)
152 self.address = self.listener.address
153
154 self.id_to_obj = {0: (None, ())}
155 self.id_to_refcount = {}
156 self.mutex = threading.RLock()
157 self.stop = 0
158
159 def serve_forever(self):
160 '''
161 Run the server forever
162 '''
163 current_process()._manager_server = self
164 try:
165 try:
166 while 1:
167 try:
168 c = self.listener.accept()
169 except (OSError, IOError):
170 continue
171 t = threading.Thread(target=self.handle_request, args=(c,))
172 t.setDaemon(True)
173 t.start()
174 except (KeyboardInterrupt, SystemExit):
175 pass
176 finally:
177 self.stop = 999
178 self.listener.close()
179
180 def handle_request(self, c):
181 '''
182 Handle a new connection
183 '''
184 funcname = result = request = None
185 try:
186 connection.deliver_challenge(c, self.authkey)
187 connection.answer_challenge(c, self.authkey)
188 request = c.recv()
189 ignore, funcname, args, kwds = request
190 assert funcname in self.public, '%r unrecognized' % funcname
191 func = getattr(self, funcname)
192 except Exception:
193 msg = ('#TRACEBACK', format_exc())
194 else:
195 try:
196 result = func(c, *args, **kwds)
197 except Exception:
198 msg = ('#TRACEBACK', format_exc())
199 else:
200 msg = ('#RETURN', result)
201 try:
202 c.send(msg)
203 except Exception, e:
204 try:
205 c.send(('#TRACEBACK', format_exc()))
206 except Exception:
207 pass
208 util.info('Failure to send message: %r', msg)
209 util.info(' ... request was %r', request)
210 util.info(' ... exception was %r', e)
211
212 c.close()
213
214 def serve_client(self, conn):
215 '''
216 Handle requests from the proxies in a particular process/thread
217 '''
218 util.debug('starting server thread to service %r',
219 threading.currentThread().getName())
220
221 recv = conn.recv
222 send = conn.send
223 id_to_obj = self.id_to_obj
224
225 while not self.stop:
226
227 try:
228 methodname = obj = None
229 request = recv()
230 ident, methodname, args, kwds = request
231 obj, exposed, gettypeid = id_to_obj[ident]
232
233 if methodname not in exposed:
234 raise AttributeError(
235 'method %r of %r object is not in exposed=%r' %
236 (methodname, type(obj), exposed)
237 )
238
239 function = getattr(obj, methodname)
240
241 try:
242 res = function(*args, **kwds)
243 except Exception, e:
244 msg = ('#ERROR', e)
245 else:
246 typeid = gettypeid and gettypeid.get(methodname, None)
247 if typeid:
248 rident, rexposed = self.create(conn, typeid, res)
249 token = Token(typeid, self.address, rident)
250 msg = ('#PROXY', (rexposed, token))
251 else:
252 msg = ('#RETURN', res)
253
254 except AttributeError:
255 if methodname is None:
256 msg = ('#TRACEBACK', format_exc())
257 else:
258 try:
259 fallback_func = self.fallback_mapping[methodname]
260 result = fallback_func(
261 self, conn, ident, obj, *args, **kwds
262 )
263 msg = ('#RETURN', result)
264 except Exception:
265 msg = ('#TRACEBACK', format_exc())
266
267 except EOFError:
268 util.debug('got EOF -- exiting thread serving %r',
269 threading.currentThread().getName())
270 sys.exit(0)
271
272 except Exception:
273 msg = ('#TRACEBACK', format_exc())
274
275 try:
276 try:
277 send(msg)
278 except Exception, e:
279 send(('#UNSERIALIZABLE', repr(msg)))
280 except Exception, e:
281 util.info('exception in thread serving %r',
282 threading.currentThread().getName())
283 util.info(' ... message was %r', msg)
284 util.info(' ... exception was %r', e)
285 conn.close()
286 sys.exit(1)
287
288 def fallback_getvalue(self, conn, ident, obj):
289 return obj
290
291 def fallback_str(self, conn, ident, obj):
292 return str(obj)
293
294 def fallback_repr(self, conn, ident, obj):
295 return repr(obj)
296
297 fallback_mapping = {
298 '__str__':fallback_str,
299 '__repr__':fallback_repr,
300 '#GETVALUE':fallback_getvalue
301 }
302
303 def dummy(self, c):
304 pass
305
306 def debug_info(self, c):
307 '''
308 Return some info --- useful to spot problems with refcounting
309 '''
310 self.mutex.acquire()
311 try:
312 result = []
313 keys = self.id_to_obj.keys()
314 keys.sort()
315 for ident in keys:
316 if ident != 0:
317 result.append(' %s: refcount=%s\n %s' %
318 (ident, self.id_to_refcount[ident],
319 str(self.id_to_obj[ident][0])[:75]))
320 return '\n'.join(result)
321 finally:
322 self.mutex.release()
323
324 def number_of_objects(self, c):
325 '''
326 Number of shared objects
327 '''
328 return len(self.id_to_obj) - 1 # don't count ident=0
329
330 def shutdown(self, c):
331 '''
332 Shutdown this process
333 '''
334 try:
335 try:
336 util.debug('manager received shutdown message')
337 c.send(('#RETURN', None))
338
339 if sys.stdout != sys.__stdout__:
340 util.debug('resetting stdout, stderr')
341 sys.stdout = sys.__stdout__
342 sys.stderr = sys.__stderr__
343
344 util._run_finalizers(0)
345
346 for p in active_children():
347 util.debug('terminating a child process of manager')
348 p.terminate()
349
350 for p in active_children():
351 util.debug('terminating a child process of manager')
352 p.join()
353
354 util._run_finalizers()
355 util.info('manager exiting with exitcode 0')
356 except:
357 import traceback
358 traceback.print_exc()
359 finally:
360 exit(0)
361
362 def create(self, c, typeid, *args, **kwds):
363 '''
364 Create a new shared object and return its id
365 '''
366 self.mutex.acquire()
367 try:
368 callable, exposed, method_to_typeid, proxytype = \
369 self.registry[typeid]
370
371 if callable is None:
372 assert len(args) == 1 and not kwds
373 obj = args[0]
374 else:
375 obj = callable(*args, **kwds)
376
377 if exposed is None:
378 exposed = public_methods(obj)
379 if method_to_typeid is not None:
380 assert type(method_to_typeid) is dict
381 exposed = list(exposed) + list(method_to_typeid)
382
383 ident = '%x' % id(obj) # convert to string because xmlrpclib
384 # only has 32 bit signed integers
385 util.debug('%r callable returned object with id %r', typeid, ident)
386
387 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
388 if ident not in self.id_to_refcount:
389 self.id_to_refcount[ident] = None
390 return ident, tuple(exposed)
391 finally:
392 self.mutex.release()
393
394 def get_methods(self, c, token):
395 '''
396 Return the methods of the shared object indicated by token
397 '''
398 return tuple(self.id_to_obj[token.id][1])
399
400 def accept_connection(self, c, name):
401 '''
402 Spawn a new thread to serve this connection
403 '''
404 threading.currentThread().setName(name)
405 c.send(('#RETURN', None))
406 self.serve_client(c)
407
408 def incref(self, c, ident):
409 self.mutex.acquire()
410 try:
411 try:
412 self.id_to_refcount[ident] += 1
413 except TypeError:
414 assert self.id_to_refcount[ident] is None
415 self.id_to_refcount[ident] = 1
416 finally:
417 self.mutex.release()
418
419 def decref(self, c, ident):
420 self.mutex.acquire()
421 try:
422 assert self.id_to_refcount[ident] >= 1
423 self.id_to_refcount[ident] -= 1
424 if self.id_to_refcount[ident] == 0:
425 del self.id_to_obj[ident], self.id_to_refcount[ident]
426 util.debug('disposing of obj with id %d', ident)
427 finally:
428 self.mutex.release()
429
430#
431# Class to represent state of a manager
432#
433
434class State(object):
435 __slots__ = ['value']
436 INITIAL = 0
437 STARTED = 1
438 SHUTDOWN = 2
439
440#
441# Mapping from serializer name to Listener and Client types
442#
443
444listener_client = {
445 'pickle' : (connection.Listener, connection.Client),
446 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
447 }
448
449#
450# Definition of BaseManager
451#
452
453class BaseManager(object):
454 '''
455 Base class for managers
456 '''
457 _registry = {}
458 _Server = Server
459
460 def __init__(self, address=None, authkey=None, serializer='pickle'):
461 if authkey is None:
462 authkey = current_process().get_authkey()
463 self._address = address # XXX not final address if eg ('', 0)
464 self._authkey = AuthenticationString(authkey)
465 self._state = State()
466 self._state.value = State.INITIAL
467 self._serializer = serializer
468 self._Listener, self._Client = listener_client[serializer]
469
470 def __reduce__(self):
471 return type(self).from_address, \
472 (self._address, self._authkey, self._serializer)
473
474 def get_server(self):
475 '''
476 Return server object with serve_forever() method and address attribute
477 '''
478 assert self._state.value == State.INITIAL
479 return Server(self._registry, self._address,
480 self._authkey, self._serializer)
481
482 def connect(self):
483 '''
484 Connect manager object to the server process
485 '''
486 Listener, Client = listener_client[self._serializer]
487 conn = Client(self._address, authkey=self._authkey)
488 dispatch(conn, None, 'dummy')
489 self._state.value = State.STARTED
490
491 def start(self):
492 '''
493 Spawn a server process for this manager object
494 '''
495 assert self._state.value == State.INITIAL
496
497 # pipe over which we will retrieve address of server
498 reader, writer = connection.Pipe(duplex=False)
499
500 # spawn process which runs a server
501 self._process = Process(
502 target=type(self)._run_server,
503 args=(self._registry, self._address, self._authkey,
504 self._serializer, writer),
505 )
506 ident = ':'.join(str(i) for i in self._process._identity)
507 self._process.set_name(type(self).__name__ + '-' + ident)
508 self._process.start()
509
510 # get address of server
511 writer.close()
512 self._address = reader.recv()
513 reader.close()
514
515 # register a finalizer
516 self._state.value = State.STARTED
517 self.shutdown = util.Finalize(
518 self, type(self)._finalize_manager,
519 args=(self._process, self._address, self._authkey,
520 self._state, self._Client),
521 exitpriority=0
522 )
523
524 @classmethod
525 def _run_server(cls, registry, address, authkey, serializer, writer):
526 '''
527 Create a server, report its address and run it
528 '''
529 # create server
530 server = cls._Server(registry, address, authkey, serializer)
531
532 # inform parent process of the server's address
533 writer.send(server.address)
534 writer.close()
535
536 # run the manager
537 util.info('manager serving at %r', server.address)
538 server.serve_forever()
539
540 def _create(self, typeid, *args, **kwds):
541 '''
542 Create a new shared object; return the token and exposed tuple
543 '''
544 assert self._state.value == State.STARTED, 'server not yet started'
545 conn = self._Client(self._address, authkey=self._authkey)
546 try:
547 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
548 finally:
549 conn.close()
550 return Token(typeid, self._address, id), exposed
551
552 def join(self, timeout=None):
553 '''
554 Join the manager process (if it has been spawned)
555 '''
556 self._process.join(timeout)
557
558 def _debug_info(self):
559 '''
560 Return some info about the servers shared objects and connections
561 '''
562 conn = self._Client(self._address, authkey=self._authkey)
563 try:
564 return dispatch(conn, None, 'debug_info')
565 finally:
566 conn.close()
567
568 def _number_of_objects(self):
569 '''
570 Return the number of shared objects
571 '''
572 conn = self._Client(self._address, authkey=self._authkey)
573 try:
574 return dispatch(conn, None, 'number_of_objects')
575 finally:
576 conn.close()
577
578 def __enter__(self):
579 return self
580
581 def __exit__(self, exc_type, exc_val, exc_tb):
582 self.shutdown()
583
584 @staticmethod
585 def _finalize_manager(process, address, authkey, state, _Client):
586 '''
587 Shutdown the manager process; will be registered as a finalizer
588 '''
589 if process.is_alive():
590 util.info('sending shutdown message to manager')
591 try:
592 conn = _Client(address, authkey=authkey)
593 try:
594 dispatch(conn, None, 'shutdown')
595 finally:
596 conn.close()
597 except Exception:
598 pass
599
600 process.join(timeout=0.2)
601 if process.is_alive():
602 util.info('manager still alive')
603 if hasattr(process, 'terminate'):
604 util.info('trying to `terminate()` manager process')
605 process.terminate()
606 process.join(timeout=0.1)
607 if process.is_alive():
608 util.info('manager still alive after terminate')
609
610 state.value = State.SHUTDOWN
611 try:
612 del BaseProxy._address_to_local[address]
613 except KeyError:
614 pass
615
616 address = property(lambda self: self._address)
617
618 @classmethod
619 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
620 method_to_typeid=None, create_method=True):
621 '''
622 Register a typeid with the manager type
623 '''
624 if '_registry' not in cls.__dict__:
625 cls._registry = cls._registry.copy()
626
627 if proxytype is None:
628 proxytype = AutoProxy
629
630 exposed = exposed or getattr(proxytype, '_exposed_', None)
631
632 method_to_typeid = method_to_typeid or \
633 getattr(proxytype, '_method_to_typeid_', None)
634
635 if method_to_typeid:
636 for key, value in method_to_typeid.items():
637 assert type(key) is str, '%r is not a string' % key
638 assert type(value) is str, '%r is not a string' % value
639
640 cls._registry[typeid] = (
641 callable, exposed, method_to_typeid, proxytype
642 )
643
644 if create_method:
645 def temp(self, *args, **kwds):
646 util.debug('requesting creation of a shared %r object', typeid)
647 token, exp = self._create(typeid, *args, **kwds)
648 proxy = proxytype(
649 token, self._serializer, manager=self,
650 authkey=self._authkey, exposed=exp
651 )
652 return proxy
653 temp.__name__ = typeid
654 setattr(cls, typeid, temp)
655
656#
657# Subclass of set which get cleared after a fork
658#
659
660class ProcessLocalSet(set):
661 def __init__(self):
662 util.register_after_fork(self, lambda obj: obj.clear())
663 def __reduce__(self):
664 return type(self), ()
665
666#
667# Definition of BaseProxy
668#
669
670class BaseProxy(object):
671 '''
672 A base for proxies of shared objects
673 '''
674 _address_to_local = {}
675 _mutex = util.ForkAwareThreadLock()
676
677 def __init__(self, token, serializer, manager=None,
678 authkey=None, exposed=None, incref=True):
679 BaseProxy._mutex.acquire()
680 try:
681 tls_idset = BaseProxy._address_to_local.get(token.address, None)
682 if tls_idset is None:
683 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
684 BaseProxy._address_to_local[token.address] = tls_idset
685 finally:
686 BaseProxy._mutex.release()
687
688 # self._tls is used to record the connection used by this
689 # thread to communicate with the manager at token.address
690 self._tls = tls_idset[0]
691
692 # self._idset is used to record the identities of all shared
693 # objects for which the current process owns references and
694 # which are in the manager at token.address
695 self._idset = tls_idset[1]
696
697 self._token = token
698 self._id = self._token.id
699 self._manager = manager
700 self._serializer = serializer
701 self._Client = listener_client[serializer][1]
702
703 if authkey is not None:
704 self._authkey = AuthenticationString(authkey)
705 elif self._manager is not None:
706 self._authkey = self._manager._authkey
707 else:
708 self._authkey = current_process().get_authkey()
709
710 if incref:
711 self._incref()
712
713 util.register_after_fork(self, BaseProxy._after_fork)
714
715 def _connect(self):
716 util.debug('making connection to manager')
717 name = current_process().get_name()
718 if threading.currentThread().getName() != 'MainThread':
719 name += '|' + threading.currentThread().getName()
720 conn = self._Client(self._token.address, authkey=self._authkey)
721 dispatch(conn, None, 'accept_connection', (name,))
722 self._tls.connection = conn
723
724 def _callmethod(self, methodname, args=(), kwds={}):
725 '''
726 Try to call a method of the referrent and return a copy of the result
727 '''
728 try:
729 conn = self._tls.connection
730 except AttributeError:
731 util.debug('thread %r does not own a connection',
732 threading.currentThread().getName())
733 self._connect()
734 conn = self._tls.connection
735
736 conn.send((self._id, methodname, args, kwds))
737 kind, result = conn.recv()
738
739 if kind == '#RETURN':
740 return result
741 elif kind == '#PROXY':
742 exposed, token = result
743 proxytype = self._manager._registry[token.typeid][-1]
744 return proxytype(
745 token, self._serializer, manager=self._manager,
746 authkey=self._authkey, exposed=exposed
747 )
748 raise convert_to_error(kind, result)
749
750 def _getvalue(self):
751 '''
752 Get a copy of the value of the referent
753 '''
754 return self._callmethod('#GETVALUE')
755
756 def _incref(self):
757 conn = self._Client(self._token.address, authkey=self._authkey)
758 dispatch(conn, None, 'incref', (self._id,))
759 util.debug('INCREF %r', self._token.id)
760
761 self._idset.add(self._id)
762
763 state = self._manager and self._manager._state
764
765 self._close = util.Finalize(
766 self, BaseProxy._decref,
767 args=(self._token, self._authkey, state,
768 self._tls, self._idset, self._Client),
769 exitpriority=10
770 )
771
772 @staticmethod
773 def _decref(token, authkey, state, tls, idset, _Client):
774 idset.discard(token.id)
775
776 # check whether manager is still alive
777 if state is None or state.value == State.STARTED:
778 # tell manager this process no longer cares about referent
779 try:
780 util.debug('DECREF %r', token.id)
781 conn = _Client(token.address, authkey=authkey)
782 dispatch(conn, None, 'decref', (token.id,))
783 except Exception, e:
784 util.debug('... decref failed %s', e)
785
786 else:
787 util.debug('DECREF %r -- manager already shutdown', token.id)
788
789 # check whether we can close this thread's connection because
790 # the process owns no more references to objects for this manager
791 if not idset and hasattr(tls, 'connection'):
792 util.debug('thread %r has no more proxies so closing conn',
793 threading.currentThread().getName())
794 tls.connection.close()
795 del tls.connection
796
797 def _after_fork(self):
798 self._manager = None
799 try:
800 self._incref()
801 except Exception, e:
802 # the proxy may just be for a manager which has shutdown
803 util.info('incref failed: %s' % e)
804
805 def __reduce__(self):
806 kwds = {}
807 if Popen.thread_is_spawning():
808 kwds['authkey'] = self._authkey
809
810 if getattr(self, '_isauto', False):
811 kwds['exposed'] = self._exposed_
812 return (RebuildProxy,
813 (AutoProxy, self._token, self._serializer, kwds))
814 else:
815 return (RebuildProxy,
816 (type(self), self._token, self._serializer, kwds))
817
818 def __deepcopy__(self, memo):
819 return self._getvalue()
820
821 def __repr__(self):
822 return '<%s object, typeid %r at %s>' % \
823 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
824
825 def __str__(self):
826 '''
827 Return representation of the referent (or a fall-back if that fails)
828 '''
829 try:
830 return self._callmethod('__repr__')
831 except Exception:
832 return repr(self)[:-1] + "; '__str__()' failed>"
833
834#
835# Function used for unpickling
836#
837
838def RebuildProxy(func, token, serializer, kwds):
839 '''
840 Function used for unpickling proxy objects.
841
842 If possible the shared object is returned, or otherwise a proxy for it.
843 '''
844 server = getattr(current_process(), '_manager_server', None)
845
846 if server and server.address == token.address:
847 return server.id_to_obj[token.id][0]
848 else:
849 incref = (
850 kwds.pop('incref', True) and
851 not getattr(current_process(), '_inheriting', False)
852 )
853 return func(token, serializer, incref=incref, **kwds)
854
855#
856# Functions to create proxies and proxy types
857#
858
859def MakeProxyType(name, exposed, _cache={}):
860 '''
861 Return an proxy type whose methods are given by `exposed`
862 '''
863 exposed = tuple(exposed)
864 try:
865 return _cache[(name, exposed)]
866 except KeyError:
867 pass
868
869 dic = {}
870
871 for meth in exposed:
872 exec '''def %s(self, *args, **kwds):
873 return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
874
875 ProxyType = type(name, (BaseProxy,), dic)
876 ProxyType._exposed_ = exposed
877 _cache[(name, exposed)] = ProxyType
878 return ProxyType
879
880
881def AutoProxy(token, serializer, manager=None, authkey=None,
882 exposed=None, incref=True):
883 '''
884 Return an auto-proxy for `token`
885 '''
886 _Client = listener_client[serializer][1]
887
888 if exposed is None:
889 conn = _Client(token.address, authkey=authkey)
890 try:
891 exposed = dispatch(conn, None, 'get_methods', (token,))
892 finally:
893 conn.close()
894
895 if authkey is None and manager is not None:
896 authkey = manager._authkey
897 if authkey is None:
898 authkey = current_process().get_authkey()
899
900 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
901 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
902 incref=incref)
903 proxy._isauto = True
904 return proxy
905
906#
907# Types/callables which we will register with SyncManager
908#
909
910class Namespace(object):
911 def __init__(self, **kwds):
912 self.__dict__.update(kwds)
913 def __repr__(self):
914 items = self.__dict__.items()
915 temp = []
916 for name, value in items:
917 if not name.startswith('_'):
918 temp.append('%s=%r' % (name, value))
919 temp.sort()
920 return 'Namespace(%s)' % str.join(', ', temp)
921
922class Value(object):
923 def __init__(self, typecode, value, lock=True):
924 self._typecode = typecode
925 self._value = value
926 def get(self):
927 return self._value
928 def set(self, value):
929 self._value = value
930 def __repr__(self):
931 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
932 value = property(get, set)
933
934def Array(typecode, sequence, lock=True):
935 return array.array(typecode, sequence)
936
937#
938# Proxy types used by SyncManager
939#
940
941class IteratorProxy(BaseProxy):
942 # XXX remove methods for Py3.0 and Py2.6
943 _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
944 def __iter__(self):
945 return self
946 def __next__(self, *args):
947 return self._callmethod('__next__', args)
948 def next(self, *args):
949 return self._callmethod('next', args)
950 def send(self, *args):
951 return self._callmethod('send', args)
952 def throw(self, *args):
953 return self._callmethod('throw', args)
954 def close(self, *args):
955 return self._callmethod('close', args)
956
957
958class AcquirerProxy(BaseProxy):
959 _exposed_ = ('acquire', 'release')
960 def acquire(self, blocking=True):
961 return self._callmethod('acquire', (blocking,))
962 def release(self):
963 return self._callmethod('release')
964 def __enter__(self):
965 return self._callmethod('acquire')
966 def __exit__(self, exc_type, exc_val, exc_tb):
967 return self._callmethod('release')
968
969
970class ConditionProxy(AcquirerProxy):
971 # XXX will Condition.notfyAll() name be available in Py3.0?
972 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notifyAll')
973 def wait(self, timeout=None):
974 return self._callmethod('wait', (timeout,))
975 def notify(self):
976 return self._callmethod('notify')
977 def notify_all(self):
978 return self._callmethod('notifyAll')
979
980class EventProxy(BaseProxy):
981 # XXX will Event.isSet name be available in Py3.0?
982 _exposed_ = ('isSet', 'set', 'clear', 'wait')
983 def is_set(self):
984 return self._callmethod('isSet')
985 def set(self):
986 return self._callmethod('set')
987 def clear(self):
988 return self._callmethod('clear')
989 def wait(self, timeout=None):
990 return self._callmethod('wait', (timeout,))
991
992class NamespaceProxy(BaseProxy):
993 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
994 def __getattr__(self, key):
995 if key[0] == '_':
996 return object.__getattribute__(self, key)
997 callmethod = object.__getattribute__(self, '_callmethod')
998 return callmethod('__getattribute__', (key,))
999 def __setattr__(self, key, value):
1000 if key[0] == '_':
1001 return object.__setattr__(self, key, value)
1002 callmethod = object.__getattribute__(self, '_callmethod')
1003 return callmethod('__setattr__', (key, value))
1004 def __delattr__(self, key):
1005 if key[0] == '_':
1006 return object.__delattr__(self, key)
1007 callmethod = object.__getattribute__(self, '_callmethod')
1008 return callmethod('__delattr__', (key,))
1009
1010
1011class ValueProxy(BaseProxy):
1012 _exposed_ = ('get', 'set')
1013 def get(self):
1014 return self._callmethod('get')
1015 def set(self, value):
1016 return self._callmethod('set', (value,))
1017 value = property(get, set)
1018
1019
1020BaseListProxy = MakeProxyType('BaseListProxy', (
1021 '__add__', '__contains__', '__delitem__', '__delslice__',
1022 '__getitem__', '__getslice__', '__len__', '__mul__',
1023 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1024 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1025 'reverse', 'sort', '__imul__'
1026 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1027class ListProxy(BaseListProxy):
1028 def __iadd__(self, value):
1029 self._callmethod('extend', (value,))
1030 return self
1031 def __imul__(self, value):
1032 self._callmethod('__imul__', (value,))
1033 return self
1034
1035
1036DictProxy = MakeProxyType('DictProxy', (
1037 '__contains__', '__delitem__', '__getitem__', '__len__',
1038 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1039 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1040 ))
1041
1042
1043ArrayProxy = MakeProxyType('ArrayProxy', (
1044 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1045 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1046
1047
1048PoolProxy = MakeProxyType('PoolProxy', (
1049 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1050 'map', 'map_async', 'terminate'
1051 ))
1052PoolProxy._method_to_typeid_ = {
1053 'apply_async': 'AsyncResult',
1054 'map_async': 'AsyncResult',
1055 'imap': 'Iterator',
1056 'imap_unordered': 'Iterator'
1057 }
1058
1059#
1060# Definition of SyncManager
1061#
1062
1063class SyncManager(BaseManager):
1064 '''
1065 Subclass of `BaseManager` which supports a number of shared object types.
1066
1067 The types registered are those intended for the synchronization
1068 of threads, plus `dict`, `list` and `Namespace`.
1069
1070 The `multiprocessing.Manager()` function creates started instances of
1071 this class.
1072 '''
1073
1074SyncManager.register('Queue', Queue.Queue)
1075SyncManager.register('JoinableQueue', Queue.Queue)
1076SyncManager.register('Event', threading.Event, EventProxy)
1077SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1078SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1079SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1080SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1081 AcquirerProxy)
1082SyncManager.register('Condition', threading.Condition, ConditionProxy)
1083SyncManager.register('Pool', Pool, PoolProxy)
1084SyncManager.register('list', list, ListProxy)
1085SyncManager.register('dict', dict, DictProxy)
1086SyncManager.register('Value', Value, ValueProxy)
1087SyncManager.register('Array', Array, ArrayProxy)
1088SyncManager.register('Namespace', Namespace, NamespaceProxy)
1089
1090# types returned by methods of PoolProxy
1091SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1092SyncManager.register('AsyncResult', create_method=False)