blob: 7ee50bed52fe90fbc60943d50fd194186cf2d729 [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8#
9
10__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11
12#
13# Imports
14#
15
16import os
17import sys
18import weakref
19import threading
20import array
21import copy_reg
22import Queue
23
24from traceback import format_exc
25from multiprocessing import Process, current_process, active_children, Pool, util, connection
26from multiprocessing.process import AuthenticationString
27from multiprocessing.forking import exit, Popen, assert_spawning
28from multiprocessing.util import Finalize, info
29
30try:
31 from cPickle import PicklingError
32except ImportError:
33 from pickle import PicklingError
34
35#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000036# Register some things for pickling
37#
38
39def reduce_array(a):
40 return array.array, (a.typecode, a.tostring())
41copy_reg.pickle(array.array, reduce_array)
42
43view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000044
45#
46# Type for identifying shared objects
47#
48
49class Token(object):
50 '''
51 Type to uniquely indentify a shared object
52 '''
53 __slots__ = ('typeid', 'address', 'id')
54
55 def __init__(self, typeid, address, id):
56 (self.typeid, self.address, self.id) = (typeid, address, id)
57
58 def __getstate__(self):
59 return (self.typeid, self.address, self.id)
60
61 def __setstate__(self, state):
62 (self.typeid, self.address, self.id) = state
63
64 def __repr__(self):
65 return 'Token(typeid=%r, address=%r, id=%r)' % \
66 (self.typeid, self.address, self.id)
67
68#
69# Function for communication with a manager's server process
70#
71
72def dispatch(c, id, methodname, args=(), kwds={}):
73 '''
74 Send a message to manager using connection `c` and return response
75 '''
76 c.send((id, methodname, args, kwds))
77 kind, result = c.recv()
78 if kind == '#RETURN':
79 return result
80 raise convert_to_error(kind, result)
81
82def convert_to_error(kind, result):
83 if kind == '#ERROR':
84 return result
85 elif kind == '#TRACEBACK':
86 assert type(result) is str
87 return RemoteError(result)
88 elif kind == '#UNSERIALIZABLE':
89 assert type(result) is str
90 return RemoteError('Unserializable message: %s\n' % result)
91 else:
92 return ValueError('Unrecognized message type')
93
94class RemoteError(Exception):
95 def __str__(self):
96 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
97
98#
99# Functions for finding the method names of an object
100#
101
102def all_methods(obj):
103 '''
104 Return a list of names of methods of `obj`
105 '''
106 temp = []
107 for name in dir(obj):
108 func = getattr(obj, name)
109 if hasattr(func, '__call__'):
110 temp.append(name)
111 return temp
112
113def public_methods(obj):
114 '''
115 Return a list of names of methods of `obj` which do not start with '_'
116 '''
117 return [name for name in all_methods(obj) if name[0] != '_']
118
119#
120# Server which is run in a process controlled by a manager
121#
122
123class Server(object):
124 '''
125 Server class which runs in a process controlled by a manager object
126 '''
127 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
128 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
129
130 def __init__(self, registry, address, authkey, serializer):
131 assert isinstance(authkey, bytes)
132 self.registry = registry
133 self.authkey = AuthenticationString(authkey)
134 Listener, Client = listener_client[serializer]
135
136 # do authentication later
137 self.listener = Listener(address=address, backlog=5)
138 self.address = self.listener.address
139
140 self.id_to_obj = {0: (None, ())}
141 self.id_to_refcount = {}
142 self.mutex = threading.RLock()
143 self.stop = 0
144
145 def serve_forever(self):
146 '''
147 Run the server forever
148 '''
149 current_process()._manager_server = self
150 try:
151 try:
152 while 1:
153 try:
154 c = self.listener.accept()
155 except (OSError, IOError):
156 continue
157 t = threading.Thread(target=self.handle_request, args=(c,))
158 t.set_daemon(True)
159 t.start()
160 except (KeyboardInterrupt, SystemExit):
161 pass
162 finally:
163 self.stop = 999
164 self.listener.close()
165
166 def handle_request(self, c):
167 '''
168 Handle a new connection
169 '''
170 funcname = result = request = None
171 try:
172 connection.deliver_challenge(c, self.authkey)
173 connection.answer_challenge(c, self.authkey)
174 request = c.recv()
175 ignore, funcname, args, kwds = request
176 assert funcname in self.public, '%r unrecognized' % funcname
177 func = getattr(self, funcname)
178 except Exception:
179 msg = ('#TRACEBACK', format_exc())
180 else:
181 try:
182 result = func(c, *args, **kwds)
183 except Exception:
184 msg = ('#TRACEBACK', format_exc())
185 else:
186 msg = ('#RETURN', result)
187 try:
188 c.send(msg)
189 except Exception, e:
190 try:
191 c.send(('#TRACEBACK', format_exc()))
192 except Exception:
193 pass
194 util.info('Failure to send message: %r', msg)
195 util.info(' ... request was %r', request)
196 util.info(' ... exception was %r', e)
197
198 c.close()
199
200 def serve_client(self, conn):
201 '''
202 Handle requests from the proxies in a particular process/thread
203 '''
204 util.debug('starting server thread to service %r',
205 threading.current_thread().get_name())
206
207 recv = conn.recv
208 send = conn.send
209 id_to_obj = self.id_to_obj
210
211 while not self.stop:
212
213 try:
214 methodname = obj = None
215 request = recv()
216 ident, methodname, args, kwds = request
217 obj, exposed, gettypeid = id_to_obj[ident]
218
219 if methodname not in exposed:
220 raise AttributeError(
221 'method %r of %r object is not in exposed=%r' %
222 (methodname, type(obj), exposed)
223 )
224
225 function = getattr(obj, methodname)
226
227 try:
228 res = function(*args, **kwds)
229 except Exception, e:
230 msg = ('#ERROR', e)
231 else:
232 typeid = gettypeid and gettypeid.get(methodname, None)
233 if typeid:
234 rident, rexposed = self.create(conn, typeid, res)
235 token = Token(typeid, self.address, rident)
236 msg = ('#PROXY', (rexposed, token))
237 else:
238 msg = ('#RETURN', res)
239
240 except AttributeError:
241 if methodname is None:
242 msg = ('#TRACEBACK', format_exc())
243 else:
244 try:
245 fallback_func = self.fallback_mapping[methodname]
246 result = fallback_func(
247 self, conn, ident, obj, *args, **kwds
248 )
249 msg = ('#RETURN', result)
250 except Exception:
251 msg = ('#TRACEBACK', format_exc())
252
253 except EOFError:
254 util.debug('got EOF -- exiting thread serving %r',
255 threading.current_thread().get_name())
256 sys.exit(0)
257
258 except Exception:
259 msg = ('#TRACEBACK', format_exc())
260
261 try:
262 try:
263 send(msg)
264 except Exception, e:
265 send(('#UNSERIALIZABLE', repr(msg)))
266 except Exception, e:
267 util.info('exception in thread serving %r',
268 threading.current_thread().get_name())
269 util.info(' ... message was %r', msg)
270 util.info(' ... exception was %r', e)
271 conn.close()
272 sys.exit(1)
273
274 def fallback_getvalue(self, conn, ident, obj):
275 return obj
276
277 def fallback_str(self, conn, ident, obj):
278 return str(obj)
279
280 def fallback_repr(self, conn, ident, obj):
281 return repr(obj)
282
283 fallback_mapping = {
284 '__str__':fallback_str,
285 '__repr__':fallback_repr,
286 '#GETVALUE':fallback_getvalue
287 }
288
289 def dummy(self, c):
290 pass
291
292 def debug_info(self, c):
293 '''
294 Return some info --- useful to spot problems with refcounting
295 '''
296 self.mutex.acquire()
297 try:
298 result = []
299 keys = self.id_to_obj.keys()
300 keys.sort()
301 for ident in keys:
302 if ident != 0:
303 result.append(' %s: refcount=%s\n %s' %
304 (ident, self.id_to_refcount[ident],
305 str(self.id_to_obj[ident][0])[:75]))
306 return '\n'.join(result)
307 finally:
308 self.mutex.release()
309
310 def number_of_objects(self, c):
311 '''
312 Number of shared objects
313 '''
314 return len(self.id_to_obj) - 1 # don't count ident=0
315
316 def shutdown(self, c):
317 '''
318 Shutdown this process
319 '''
320 try:
321 try:
322 util.debug('manager received shutdown message')
323 c.send(('#RETURN', None))
324
325 if sys.stdout != sys.__stdout__:
326 util.debug('resetting stdout, stderr')
327 sys.stdout = sys.__stdout__
328 sys.stderr = sys.__stderr__
329
330 util._run_finalizers(0)
331
332 for p in active_children():
333 util.debug('terminating a child process of manager')
334 p.terminate()
335
336 for p in active_children():
337 util.debug('terminating a child process of manager')
338 p.join()
339
340 util._run_finalizers()
341 util.info('manager exiting with exitcode 0')
342 except:
343 import traceback
344 traceback.print_exc()
345 finally:
346 exit(0)
347
348 def create(self, c, typeid, *args, **kwds):
349 '''
350 Create a new shared object and return its id
351 '''
352 self.mutex.acquire()
353 try:
354 callable, exposed, method_to_typeid, proxytype = \
355 self.registry[typeid]
356
357 if callable is None:
358 assert len(args) == 1 and not kwds
359 obj = args[0]
360 else:
361 obj = callable(*args, **kwds)
362
363 if exposed is None:
364 exposed = public_methods(obj)
365 if method_to_typeid is not None:
366 assert type(method_to_typeid) is dict
367 exposed = list(exposed) + list(method_to_typeid)
368
369 ident = '%x' % id(obj) # convert to string because xmlrpclib
370 # only has 32 bit signed integers
371 util.debug('%r callable returned object with id %r', typeid, ident)
372
373 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
374 if ident not in self.id_to_refcount:
375 self.id_to_refcount[ident] = None
376 return ident, tuple(exposed)
377 finally:
378 self.mutex.release()
379
380 def get_methods(self, c, token):
381 '''
382 Return the methods of the shared object indicated by token
383 '''
384 return tuple(self.id_to_obj[token.id][1])
385
386 def accept_connection(self, c, name):
387 '''
388 Spawn a new thread to serve this connection
389 '''
390 threading.current_thread().set_name(name)
391 c.send(('#RETURN', None))
392 self.serve_client(c)
393
394 def incref(self, c, ident):
395 self.mutex.acquire()
396 try:
397 try:
398 self.id_to_refcount[ident] += 1
399 except TypeError:
400 assert self.id_to_refcount[ident] is None
401 self.id_to_refcount[ident] = 1
402 finally:
403 self.mutex.release()
404
405 def decref(self, c, ident):
406 self.mutex.acquire()
407 try:
408 assert self.id_to_refcount[ident] >= 1
409 self.id_to_refcount[ident] -= 1
410 if self.id_to_refcount[ident] == 0:
411 del self.id_to_obj[ident], self.id_to_refcount[ident]
412 util.debug('disposing of obj with id %d', ident)
413 finally:
414 self.mutex.release()
415
416#
417# Class to represent state of a manager
418#
419
420class State(object):
421 __slots__ = ['value']
422 INITIAL = 0
423 STARTED = 1
424 SHUTDOWN = 2
425
426#
427# Mapping from serializer name to Listener and Client types
428#
429
430listener_client = {
431 'pickle' : (connection.Listener, connection.Client),
432 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
433 }
434
435#
436# Definition of BaseManager
437#
438
439class BaseManager(object):
440 '''
441 Base class for managers
442 '''
443 _registry = {}
444 _Server = Server
445
446 def __init__(self, address=None, authkey=None, serializer='pickle'):
447 if authkey is None:
448 authkey = current_process().get_authkey()
449 self._address = address # XXX not final address if eg ('', 0)
450 self._authkey = AuthenticationString(authkey)
451 self._state = State()
452 self._state.value = State.INITIAL
453 self._serializer = serializer
454 self._Listener, self._Client = listener_client[serializer]
455
456 def __reduce__(self):
457 return type(self).from_address, \
458 (self._address, self._authkey, self._serializer)
459
460 def get_server(self):
461 '''
462 Return server object with serve_forever() method and address attribute
463 '''
464 assert self._state.value == State.INITIAL
465 return Server(self._registry, self._address,
466 self._authkey, self._serializer)
467
468 def connect(self):
469 '''
470 Connect manager object to the server process
471 '''
472 Listener, Client = listener_client[self._serializer]
473 conn = Client(self._address, authkey=self._authkey)
474 dispatch(conn, None, 'dummy')
475 self._state.value = State.STARTED
476
477 def start(self):
478 '''
479 Spawn a server process for this manager object
480 '''
481 assert self._state.value == State.INITIAL
482
483 # pipe over which we will retrieve address of server
484 reader, writer = connection.Pipe(duplex=False)
485
486 # spawn process which runs a server
487 self._process = Process(
488 target=type(self)._run_server,
489 args=(self._registry, self._address, self._authkey,
490 self._serializer, writer),
491 )
492 ident = ':'.join(str(i) for i in self._process._identity)
493 self._process.set_name(type(self).__name__ + '-' + ident)
494 self._process.start()
495
496 # get address of server
497 writer.close()
498 self._address = reader.recv()
499 reader.close()
500
501 # register a finalizer
502 self._state.value = State.STARTED
503 self.shutdown = util.Finalize(
504 self, type(self)._finalize_manager,
505 args=(self._process, self._address, self._authkey,
506 self._state, self._Client),
507 exitpriority=0
508 )
509
510 @classmethod
511 def _run_server(cls, registry, address, authkey, serializer, writer):
512 '''
513 Create a server, report its address and run it
514 '''
515 # create server
516 server = cls._Server(registry, address, authkey, serializer)
517
518 # inform parent process of the server's address
519 writer.send(server.address)
520 writer.close()
521
522 # run the manager
523 util.info('manager serving at %r', server.address)
524 server.serve_forever()
525
526 def _create(self, typeid, *args, **kwds):
527 '''
528 Create a new shared object; return the token and exposed tuple
529 '''
530 assert self._state.value == State.STARTED, 'server not yet started'
531 conn = self._Client(self._address, authkey=self._authkey)
532 try:
533 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
534 finally:
535 conn.close()
536 return Token(typeid, self._address, id), exposed
537
538 def join(self, timeout=None):
539 '''
540 Join the manager process (if it has been spawned)
541 '''
542 self._process.join(timeout)
543
544 def _debug_info(self):
545 '''
546 Return some info about the servers shared objects and connections
547 '''
548 conn = self._Client(self._address, authkey=self._authkey)
549 try:
550 return dispatch(conn, None, 'debug_info')
551 finally:
552 conn.close()
553
554 def _number_of_objects(self):
555 '''
556 Return the number of shared objects
557 '''
558 conn = self._Client(self._address, authkey=self._authkey)
559 try:
560 return dispatch(conn, None, 'number_of_objects')
561 finally:
562 conn.close()
563
564 def __enter__(self):
565 return self
566
567 def __exit__(self, exc_type, exc_val, exc_tb):
568 self.shutdown()
569
570 @staticmethod
571 def _finalize_manager(process, address, authkey, state, _Client):
572 '''
573 Shutdown the manager process; will be registered as a finalizer
574 '''
575 if process.is_alive():
576 util.info('sending shutdown message to manager')
577 try:
578 conn = _Client(address, authkey=authkey)
579 try:
580 dispatch(conn, None, 'shutdown')
581 finally:
582 conn.close()
583 except Exception:
584 pass
585
586 process.join(timeout=0.2)
587 if process.is_alive():
588 util.info('manager still alive')
589 if hasattr(process, 'terminate'):
590 util.info('trying to `terminate()` manager process')
591 process.terminate()
592 process.join(timeout=0.1)
593 if process.is_alive():
594 util.info('manager still alive after terminate')
595
596 state.value = State.SHUTDOWN
597 try:
598 del BaseProxy._address_to_local[address]
599 except KeyError:
600 pass
601
602 address = property(lambda self: self._address)
603
604 @classmethod
605 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
606 method_to_typeid=None, create_method=True):
607 '''
608 Register a typeid with the manager type
609 '''
610 if '_registry' not in cls.__dict__:
611 cls._registry = cls._registry.copy()
612
613 if proxytype is None:
614 proxytype = AutoProxy
615
616 exposed = exposed or getattr(proxytype, '_exposed_', None)
617
618 method_to_typeid = method_to_typeid or \
619 getattr(proxytype, '_method_to_typeid_', None)
620
621 if method_to_typeid:
622 for key, value in method_to_typeid.items():
623 assert type(key) is str, '%r is not a string' % key
624 assert type(value) is str, '%r is not a string' % value
625
626 cls._registry[typeid] = (
627 callable, exposed, method_to_typeid, proxytype
628 )
629
630 if create_method:
631 def temp(self, *args, **kwds):
632 util.debug('requesting creation of a shared %r object', typeid)
633 token, exp = self._create(typeid, *args, **kwds)
634 proxy = proxytype(
635 token, self._serializer, manager=self,
636 authkey=self._authkey, exposed=exp
637 )
638 return proxy
639 temp.__name__ = typeid
640 setattr(cls, typeid, temp)
641
642#
643# Subclass of set which get cleared after a fork
644#
645
646class ProcessLocalSet(set):
647 def __init__(self):
648 util.register_after_fork(self, lambda obj: obj.clear())
649 def __reduce__(self):
650 return type(self), ()
651
652#
653# Definition of BaseProxy
654#
655
656class BaseProxy(object):
657 '''
658 A base for proxies of shared objects
659 '''
660 _address_to_local = {}
661 _mutex = util.ForkAwareThreadLock()
662
663 def __init__(self, token, serializer, manager=None,
664 authkey=None, exposed=None, incref=True):
665 BaseProxy._mutex.acquire()
666 try:
667 tls_idset = BaseProxy._address_to_local.get(token.address, None)
668 if tls_idset is None:
669 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
670 BaseProxy._address_to_local[token.address] = tls_idset
671 finally:
672 BaseProxy._mutex.release()
673
674 # self._tls is used to record the connection used by this
675 # thread to communicate with the manager at token.address
676 self._tls = tls_idset[0]
677
678 # self._idset is used to record the identities of all shared
679 # objects for which the current process owns references and
680 # which are in the manager at token.address
681 self._idset = tls_idset[1]
682
683 self._token = token
684 self._id = self._token.id
685 self._manager = manager
686 self._serializer = serializer
687 self._Client = listener_client[serializer][1]
688
689 if authkey is not None:
690 self._authkey = AuthenticationString(authkey)
691 elif self._manager is not None:
692 self._authkey = self._manager._authkey
693 else:
694 self._authkey = current_process().get_authkey()
695
696 if incref:
697 self._incref()
698
699 util.register_after_fork(self, BaseProxy._after_fork)
700
701 def _connect(self):
702 util.debug('making connection to manager')
703 name = current_process().get_name()
704 if threading.current_thread().get_name() != 'MainThread':
705 name += '|' + threading.current_thread().get_name()
706 conn = self._Client(self._token.address, authkey=self._authkey)
707 dispatch(conn, None, 'accept_connection', (name,))
708 self._tls.connection = conn
709
710 def _callmethod(self, methodname, args=(), kwds={}):
711 '''
712 Try to call a method of the referrent and return a copy of the result
713 '''
714 try:
715 conn = self._tls.connection
716 except AttributeError:
717 util.debug('thread %r does not own a connection',
718 threading.current_thread().get_name())
719 self._connect()
720 conn = self._tls.connection
721
722 conn.send((self._id, methodname, args, kwds))
723 kind, result = conn.recv()
724
725 if kind == '#RETURN':
726 return result
727 elif kind == '#PROXY':
728 exposed, token = result
729 proxytype = self._manager._registry[token.typeid][-1]
730 return proxytype(
731 token, self._serializer, manager=self._manager,
732 authkey=self._authkey, exposed=exposed
733 )
734 raise convert_to_error(kind, result)
735
736 def _getvalue(self):
737 '''
738 Get a copy of the value of the referent
739 '''
740 return self._callmethod('#GETVALUE')
741
742 def _incref(self):
743 conn = self._Client(self._token.address, authkey=self._authkey)
744 dispatch(conn, None, 'incref', (self._id,))
745 util.debug('INCREF %r', self._token.id)
746
747 self._idset.add(self._id)
748
749 state = self._manager and self._manager._state
750
751 self._close = util.Finalize(
752 self, BaseProxy._decref,
753 args=(self._token, self._authkey, state,
754 self._tls, self._idset, self._Client),
755 exitpriority=10
756 )
757
758 @staticmethod
759 def _decref(token, authkey, state, tls, idset, _Client):
760 idset.discard(token.id)
761
762 # check whether manager is still alive
763 if state is None or state.value == State.STARTED:
764 # tell manager this process no longer cares about referent
765 try:
766 util.debug('DECREF %r', token.id)
767 conn = _Client(token.address, authkey=authkey)
768 dispatch(conn, None, 'decref', (token.id,))
769 except Exception, e:
770 util.debug('... decref failed %s', e)
771
772 else:
773 util.debug('DECREF %r -- manager already shutdown', token.id)
774
775 # check whether we can close this thread's connection because
776 # the process owns no more references to objects for this manager
777 if not idset and hasattr(tls, 'connection'):
778 util.debug('thread %r has no more proxies so closing conn',
779 threading.current_thread().get_name())
780 tls.connection.close()
781 del tls.connection
782
783 def _after_fork(self):
784 self._manager = None
785 try:
786 self._incref()
787 except Exception, e:
788 # the proxy may just be for a manager which has shutdown
789 util.info('incref failed: %s' % e)
790
791 def __reduce__(self):
792 kwds = {}
793 if Popen.thread_is_spawning():
794 kwds['authkey'] = self._authkey
795
796 if getattr(self, '_isauto', False):
797 kwds['exposed'] = self._exposed_
798 return (RebuildProxy,
799 (AutoProxy, self._token, self._serializer, kwds))
800 else:
801 return (RebuildProxy,
802 (type(self), self._token, self._serializer, kwds))
803
804 def __deepcopy__(self, memo):
805 return self._getvalue()
806
807 def __repr__(self):
808 return '<%s object, typeid %r at %s>' % \
809 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
810
811 def __str__(self):
812 '''
813 Return representation of the referent (or a fall-back if that fails)
814 '''
815 try:
816 return self._callmethod('__repr__')
817 except Exception:
818 return repr(self)[:-1] + "; '__str__()' failed>"
819
820#
821# Function used for unpickling
822#
823
824def RebuildProxy(func, token, serializer, kwds):
825 '''
826 Function used for unpickling proxy objects.
827
828 If possible the shared object is returned, or otherwise a proxy for it.
829 '''
830 server = getattr(current_process(), '_manager_server', None)
831
832 if server and server.address == token.address:
833 return server.id_to_obj[token.id][0]
834 else:
835 incref = (
836 kwds.pop('incref', True) and
837 not getattr(current_process(), '_inheriting', False)
838 )
839 return func(token, serializer, incref=incref, **kwds)
840
841#
842# Functions to create proxies and proxy types
843#
844
845def MakeProxyType(name, exposed, _cache={}):
846 '''
847 Return an proxy type whose methods are given by `exposed`
848 '''
849 exposed = tuple(exposed)
850 try:
851 return _cache[(name, exposed)]
852 except KeyError:
853 pass
854
855 dic = {}
856
857 for meth in exposed:
858 exec '''def %s(self, *args, **kwds):
859 return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
860
861 ProxyType = type(name, (BaseProxy,), dic)
862 ProxyType._exposed_ = exposed
863 _cache[(name, exposed)] = ProxyType
864 return ProxyType
865
866
867def AutoProxy(token, serializer, manager=None, authkey=None,
868 exposed=None, incref=True):
869 '''
870 Return an auto-proxy for `token`
871 '''
872 _Client = listener_client[serializer][1]
873
874 if exposed is None:
875 conn = _Client(token.address, authkey=authkey)
876 try:
877 exposed = dispatch(conn, None, 'get_methods', (token,))
878 finally:
879 conn.close()
880
881 if authkey is None and manager is not None:
882 authkey = manager._authkey
883 if authkey is None:
884 authkey = current_process().get_authkey()
885
886 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
887 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
888 incref=incref)
889 proxy._isauto = True
890 return proxy
891
892#
893# Types/callables which we will register with SyncManager
894#
895
896class Namespace(object):
897 def __init__(self, **kwds):
898 self.__dict__.update(kwds)
899 def __repr__(self):
900 items = self.__dict__.items()
901 temp = []
902 for name, value in items:
903 if not name.startswith('_'):
904 temp.append('%s=%r' % (name, value))
905 temp.sort()
906 return 'Namespace(%s)' % str.join(', ', temp)
907
908class Value(object):
909 def __init__(self, typecode, value, lock=True):
910 self._typecode = typecode
911 self._value = value
912 def get(self):
913 return self._value
914 def set(self, value):
915 self._value = value
916 def __repr__(self):
917 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
918 value = property(get, set)
919
920def Array(typecode, sequence, lock=True):
921 return array.array(typecode, sequence)
922
923#
924# Proxy types used by SyncManager
925#
926
927class IteratorProxy(BaseProxy):
928 # XXX remove methods for Py3.0 and Py2.6
929 _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
930 def __iter__(self):
931 return self
932 def __next__(self, *args):
933 return self._callmethod('__next__', args)
934 def next(self, *args):
935 return self._callmethod('next', args)
936 def send(self, *args):
937 return self._callmethod('send', args)
938 def throw(self, *args):
939 return self._callmethod('throw', args)
940 def close(self, *args):
941 return self._callmethod('close', args)
942
943
944class AcquirerProxy(BaseProxy):
945 _exposed_ = ('acquire', 'release')
946 def acquire(self, blocking=True):
947 return self._callmethod('acquire', (blocking,))
948 def release(self):
949 return self._callmethod('release')
950 def __enter__(self):
951 return self._callmethod('acquire')
952 def __exit__(self, exc_type, exc_val, exc_tb):
953 return self._callmethod('release')
954
955
956class ConditionProxy(AcquirerProxy):
957 # XXX will Condition.notfyAll() name be available in Py3.0?
958 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
959 def wait(self, timeout=None):
960 return self._callmethod('wait', (timeout,))
961 def notify(self):
962 return self._callmethod('notify')
963 def notify_all(self):
964 return self._callmethod('notify_all')
965
966class EventProxy(BaseProxy):
967 # XXX will Event.isSet name be available in Py3.0?
968 _exposed_ = ('isSet', 'set', 'clear', 'wait')
969 def is_set(self):
Benjamin Peterson0adfd932008-06-26 21:24:35 +0000970 return self._callmethod('is_set')
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000971 def set(self):
972 return self._callmethod('set')
973 def clear(self):
974 return self._callmethod('clear')
975 def wait(self, timeout=None):
976 return self._callmethod('wait', (timeout,))
977
978class NamespaceProxy(BaseProxy):
979 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
980 def __getattr__(self, key):
981 if key[0] == '_':
982 return object.__getattribute__(self, key)
983 callmethod = object.__getattribute__(self, '_callmethod')
984 return callmethod('__getattribute__', (key,))
985 def __setattr__(self, key, value):
986 if key[0] == '_':
987 return object.__setattr__(self, key, value)
988 callmethod = object.__getattribute__(self, '_callmethod')
989 return callmethod('__setattr__', (key, value))
990 def __delattr__(self, key):
991 if key[0] == '_':
992 return object.__delattr__(self, key)
993 callmethod = object.__getattribute__(self, '_callmethod')
994 return callmethod('__delattr__', (key,))
995
996
997class ValueProxy(BaseProxy):
998 _exposed_ = ('get', 'set')
999 def get(self):
1000 return self._callmethod('get')
1001 def set(self, value):
1002 return self._callmethod('set', (value,))
1003 value = property(get, set)
1004
1005
1006BaseListProxy = MakeProxyType('BaseListProxy', (
1007 '__add__', '__contains__', '__delitem__', '__delslice__',
1008 '__getitem__', '__getslice__', '__len__', '__mul__',
1009 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1010 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1011 'reverse', 'sort', '__imul__'
1012 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1013class ListProxy(BaseListProxy):
1014 def __iadd__(self, value):
1015 self._callmethod('extend', (value,))
1016 return self
1017 def __imul__(self, value):
1018 self._callmethod('__imul__', (value,))
1019 return self
1020
1021
1022DictProxy = MakeProxyType('DictProxy', (
1023 '__contains__', '__delitem__', '__getitem__', '__len__',
1024 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1025 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1026 ))
1027
1028
1029ArrayProxy = MakeProxyType('ArrayProxy', (
1030 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1031 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1032
1033
1034PoolProxy = MakeProxyType('PoolProxy', (
1035 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1036 'map', 'map_async', 'terminate'
1037 ))
1038PoolProxy._method_to_typeid_ = {
1039 'apply_async': 'AsyncResult',
1040 'map_async': 'AsyncResult',
1041 'imap': 'Iterator',
1042 'imap_unordered': 'Iterator'
1043 }
1044
1045#
1046# Definition of SyncManager
1047#
1048
1049class SyncManager(BaseManager):
1050 '''
1051 Subclass of `BaseManager` which supports a number of shared object types.
1052
1053 The types registered are those intended for the synchronization
1054 of threads, plus `dict`, `list` and `Namespace`.
1055
1056 The `multiprocessing.Manager()` function creates started instances of
1057 this class.
1058 '''
1059
1060SyncManager.register('Queue', Queue.Queue)
1061SyncManager.register('JoinableQueue', Queue.Queue)
1062SyncManager.register('Event', threading.Event, EventProxy)
1063SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1064SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1065SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1066SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1067 AcquirerProxy)
1068SyncManager.register('Condition', threading.Condition, ConditionProxy)
1069SyncManager.register('Pool', Pool, PoolProxy)
1070SyncManager.register('list', list, ListProxy)
1071SyncManager.register('dict', dict, DictProxy)
1072SyncManager.register('Value', Value, ValueProxy)
1073SyncManager.register('Array', Array, ArrayProxy)
1074SyncManager.register('Namespace', Namespace, NamespaceProxy)
1075
1076# types returned by methods of PoolProxy
1077SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1078SyncManager.register('AsyncResult', create_method=False)