blob: ba5af5701f42eb07a8ec2ae0f65eb758ef4d3c56 [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8#
9
10__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11
12#
13# Imports
14#
15
16import os
17import sys
18import weakref
19import threading
20import array
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000021import Queue
22
23from traceback import format_exc
24from multiprocessing import Process, current_process, active_children, Pool, util, connection
25from multiprocessing.process import AuthenticationString
Jesse Noller13e9d582008-07-16 14:32:36 +000026from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000027from multiprocessing.util import Finalize, info
28
29try:
30 from cPickle import PicklingError
31except ImportError:
32 from pickle import PicklingError
33
34#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000035# Register some things for pickling
36#
37
38def reduce_array(a):
39 return array.array, (a.typecode, a.tostring())
Jesse Noller13e9d582008-07-16 14:32:36 +000040ForkingPickler.register(array.array, reduce_array)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000041
42view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000043
44#
45# Type for identifying shared objects
46#
47
48class Token(object):
49 '''
50 Type to uniquely indentify a shared object
51 '''
52 __slots__ = ('typeid', 'address', 'id')
53
54 def __init__(self, typeid, address, id):
55 (self.typeid, self.address, self.id) = (typeid, address, id)
56
57 def __getstate__(self):
58 return (self.typeid, self.address, self.id)
59
60 def __setstate__(self, state):
61 (self.typeid, self.address, self.id) = state
62
63 def __repr__(self):
64 return 'Token(typeid=%r, address=%r, id=%r)' % \
65 (self.typeid, self.address, self.id)
66
67#
68# Function for communication with a manager's server process
69#
70
71def dispatch(c, id, methodname, args=(), kwds={}):
72 '''
73 Send a message to manager using connection `c` and return response
74 '''
75 c.send((id, methodname, args, kwds))
76 kind, result = c.recv()
77 if kind == '#RETURN':
78 return result
79 raise convert_to_error(kind, result)
80
81def convert_to_error(kind, result):
82 if kind == '#ERROR':
83 return result
84 elif kind == '#TRACEBACK':
85 assert type(result) is str
86 return RemoteError(result)
87 elif kind == '#UNSERIALIZABLE':
88 assert type(result) is str
89 return RemoteError('Unserializable message: %s\n' % result)
90 else:
91 return ValueError('Unrecognized message type')
92
93class RemoteError(Exception):
94 def __str__(self):
95 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
96
97#
98# Functions for finding the method names of an object
99#
100
101def all_methods(obj):
102 '''
103 Return a list of names of methods of `obj`
104 '''
105 temp = []
106 for name in dir(obj):
107 func = getattr(obj, name)
108 if hasattr(func, '__call__'):
109 temp.append(name)
110 return temp
111
112def public_methods(obj):
113 '''
114 Return a list of names of methods of `obj` which do not start with '_'
115 '''
116 return [name for name in all_methods(obj) if name[0] != '_']
117
118#
119# Server which is run in a process controlled by a manager
120#
121
122class Server(object):
123 '''
124 Server class which runs in a process controlled by a manager object
125 '''
126 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
127 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
128
129 def __init__(self, registry, address, authkey, serializer):
130 assert isinstance(authkey, bytes)
131 self.registry = registry
132 self.authkey = AuthenticationString(authkey)
133 Listener, Client = listener_client[serializer]
134
135 # do authentication later
136 self.listener = Listener(address=address, backlog=5)
137 self.address = self.listener.address
138
139 self.id_to_obj = {0: (None, ())}
140 self.id_to_refcount = {}
141 self.mutex = threading.RLock()
142 self.stop = 0
143
144 def serve_forever(self):
145 '''
146 Run the server forever
147 '''
148 current_process()._manager_server = self
149 try:
150 try:
151 while 1:
152 try:
153 c = self.listener.accept()
154 except (OSError, IOError):
155 continue
156 t = threading.Thread(target=self.handle_request, args=(c,))
157 t.set_daemon(True)
158 t.start()
159 except (KeyboardInterrupt, SystemExit):
160 pass
161 finally:
162 self.stop = 999
163 self.listener.close()
164
165 def handle_request(self, c):
166 '''
167 Handle a new connection
168 '''
169 funcname = result = request = None
170 try:
171 connection.deliver_challenge(c, self.authkey)
172 connection.answer_challenge(c, self.authkey)
173 request = c.recv()
174 ignore, funcname, args, kwds = request
175 assert funcname in self.public, '%r unrecognized' % funcname
176 func = getattr(self, funcname)
177 except Exception:
178 msg = ('#TRACEBACK', format_exc())
179 else:
180 try:
181 result = func(c, *args, **kwds)
182 except Exception:
183 msg = ('#TRACEBACK', format_exc())
184 else:
185 msg = ('#RETURN', result)
186 try:
187 c.send(msg)
188 except Exception, e:
189 try:
190 c.send(('#TRACEBACK', format_exc()))
191 except Exception:
192 pass
193 util.info('Failure to send message: %r', msg)
194 util.info(' ... request was %r', request)
195 util.info(' ... exception was %r', e)
196
197 c.close()
198
199 def serve_client(self, conn):
200 '''
201 Handle requests from the proxies in a particular process/thread
202 '''
203 util.debug('starting server thread to service %r',
204 threading.current_thread().get_name())
205
206 recv = conn.recv
207 send = conn.send
208 id_to_obj = self.id_to_obj
209
210 while not self.stop:
211
212 try:
213 methodname = obj = None
214 request = recv()
215 ident, methodname, args, kwds = request
216 obj, exposed, gettypeid = id_to_obj[ident]
217
218 if methodname not in exposed:
219 raise AttributeError(
220 'method %r of %r object is not in exposed=%r' %
221 (methodname, type(obj), exposed)
222 )
223
224 function = getattr(obj, methodname)
225
226 try:
227 res = function(*args, **kwds)
228 except Exception, e:
229 msg = ('#ERROR', e)
230 else:
231 typeid = gettypeid and gettypeid.get(methodname, None)
232 if typeid:
233 rident, rexposed = self.create(conn, typeid, res)
234 token = Token(typeid, self.address, rident)
235 msg = ('#PROXY', (rexposed, token))
236 else:
237 msg = ('#RETURN', res)
238
239 except AttributeError:
240 if methodname is None:
241 msg = ('#TRACEBACK', format_exc())
242 else:
243 try:
244 fallback_func = self.fallback_mapping[methodname]
245 result = fallback_func(
246 self, conn, ident, obj, *args, **kwds
247 )
248 msg = ('#RETURN', result)
249 except Exception:
250 msg = ('#TRACEBACK', format_exc())
251
252 except EOFError:
253 util.debug('got EOF -- exiting thread serving %r',
254 threading.current_thread().get_name())
255 sys.exit(0)
256
257 except Exception:
258 msg = ('#TRACEBACK', format_exc())
259
260 try:
261 try:
262 send(msg)
263 except Exception, e:
264 send(('#UNSERIALIZABLE', repr(msg)))
265 except Exception, e:
266 util.info('exception in thread serving %r',
267 threading.current_thread().get_name())
268 util.info(' ... message was %r', msg)
269 util.info(' ... exception was %r', e)
270 conn.close()
271 sys.exit(1)
272
273 def fallback_getvalue(self, conn, ident, obj):
274 return obj
275
276 def fallback_str(self, conn, ident, obj):
277 return str(obj)
278
279 def fallback_repr(self, conn, ident, obj):
280 return repr(obj)
281
282 fallback_mapping = {
283 '__str__':fallback_str,
284 '__repr__':fallback_repr,
285 '#GETVALUE':fallback_getvalue
286 }
287
288 def dummy(self, c):
289 pass
290
291 def debug_info(self, c):
292 '''
293 Return some info --- useful to spot problems with refcounting
294 '''
295 self.mutex.acquire()
296 try:
297 result = []
298 keys = self.id_to_obj.keys()
299 keys.sort()
300 for ident in keys:
301 if ident != 0:
302 result.append(' %s: refcount=%s\n %s' %
303 (ident, self.id_to_refcount[ident],
304 str(self.id_to_obj[ident][0])[:75]))
305 return '\n'.join(result)
306 finally:
307 self.mutex.release()
308
309 def number_of_objects(self, c):
310 '''
311 Number of shared objects
312 '''
313 return len(self.id_to_obj) - 1 # don't count ident=0
314
315 def shutdown(self, c):
316 '''
317 Shutdown this process
318 '''
319 try:
320 try:
321 util.debug('manager received shutdown message')
322 c.send(('#RETURN', None))
323
324 if sys.stdout != sys.__stdout__:
325 util.debug('resetting stdout, stderr')
326 sys.stdout = sys.__stdout__
327 sys.stderr = sys.__stderr__
328
329 util._run_finalizers(0)
330
331 for p in active_children():
332 util.debug('terminating a child process of manager')
333 p.terminate()
334
335 for p in active_children():
336 util.debug('terminating a child process of manager')
337 p.join()
338
339 util._run_finalizers()
340 util.info('manager exiting with exitcode 0')
341 except:
342 import traceback
343 traceback.print_exc()
344 finally:
345 exit(0)
346
347 def create(self, c, typeid, *args, **kwds):
348 '''
349 Create a new shared object and return its id
350 '''
351 self.mutex.acquire()
352 try:
353 callable, exposed, method_to_typeid, proxytype = \
354 self.registry[typeid]
355
356 if callable is None:
357 assert len(args) == 1 and not kwds
358 obj = args[0]
359 else:
360 obj = callable(*args, **kwds)
361
362 if exposed is None:
363 exposed = public_methods(obj)
364 if method_to_typeid is not None:
365 assert type(method_to_typeid) is dict
366 exposed = list(exposed) + list(method_to_typeid)
367
368 ident = '%x' % id(obj) # convert to string because xmlrpclib
369 # only has 32 bit signed integers
370 util.debug('%r callable returned object with id %r', typeid, ident)
371
372 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
373 if ident not in self.id_to_refcount:
374 self.id_to_refcount[ident] = None
375 return ident, tuple(exposed)
376 finally:
377 self.mutex.release()
378
379 def get_methods(self, c, token):
380 '''
381 Return the methods of the shared object indicated by token
382 '''
383 return tuple(self.id_to_obj[token.id][1])
384
385 def accept_connection(self, c, name):
386 '''
387 Spawn a new thread to serve this connection
388 '''
389 threading.current_thread().set_name(name)
390 c.send(('#RETURN', None))
391 self.serve_client(c)
392
393 def incref(self, c, ident):
394 self.mutex.acquire()
395 try:
396 try:
397 self.id_to_refcount[ident] += 1
398 except TypeError:
399 assert self.id_to_refcount[ident] is None
400 self.id_to_refcount[ident] = 1
401 finally:
402 self.mutex.release()
403
404 def decref(self, c, ident):
405 self.mutex.acquire()
406 try:
407 assert self.id_to_refcount[ident] >= 1
408 self.id_to_refcount[ident] -= 1
409 if self.id_to_refcount[ident] == 0:
410 del self.id_to_obj[ident], self.id_to_refcount[ident]
411 util.debug('disposing of obj with id %d', ident)
412 finally:
413 self.mutex.release()
414
415#
416# Class to represent state of a manager
417#
418
419class State(object):
420 __slots__ = ['value']
421 INITIAL = 0
422 STARTED = 1
423 SHUTDOWN = 2
424
425#
426# Mapping from serializer name to Listener and Client types
427#
428
429listener_client = {
430 'pickle' : (connection.Listener, connection.Client),
431 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
432 }
433
434#
435# Definition of BaseManager
436#
437
438class BaseManager(object):
439 '''
440 Base class for managers
441 '''
442 _registry = {}
443 _Server = Server
444
445 def __init__(self, address=None, authkey=None, serializer='pickle'):
446 if authkey is None:
447 authkey = current_process().get_authkey()
448 self._address = address # XXX not final address if eg ('', 0)
449 self._authkey = AuthenticationString(authkey)
450 self._state = State()
451 self._state.value = State.INITIAL
452 self._serializer = serializer
453 self._Listener, self._Client = listener_client[serializer]
454
455 def __reduce__(self):
456 return type(self).from_address, \
457 (self._address, self._authkey, self._serializer)
458
459 def get_server(self):
460 '''
461 Return server object with serve_forever() method and address attribute
462 '''
463 assert self._state.value == State.INITIAL
464 return Server(self._registry, self._address,
465 self._authkey, self._serializer)
466
467 def connect(self):
468 '''
469 Connect manager object to the server process
470 '''
471 Listener, Client = listener_client[self._serializer]
472 conn = Client(self._address, authkey=self._authkey)
473 dispatch(conn, None, 'dummy')
474 self._state.value = State.STARTED
475
476 def start(self):
477 '''
478 Spawn a server process for this manager object
479 '''
480 assert self._state.value == State.INITIAL
481
482 # pipe over which we will retrieve address of server
483 reader, writer = connection.Pipe(duplex=False)
484
485 # spawn process which runs a server
486 self._process = Process(
487 target=type(self)._run_server,
488 args=(self._registry, self._address, self._authkey,
489 self._serializer, writer),
490 )
491 ident = ':'.join(str(i) for i in self._process._identity)
492 self._process.set_name(type(self).__name__ + '-' + ident)
493 self._process.start()
494
495 # get address of server
496 writer.close()
497 self._address = reader.recv()
498 reader.close()
499
500 # register a finalizer
501 self._state.value = State.STARTED
502 self.shutdown = util.Finalize(
503 self, type(self)._finalize_manager,
504 args=(self._process, self._address, self._authkey,
505 self._state, self._Client),
506 exitpriority=0
507 )
508
509 @classmethod
510 def _run_server(cls, registry, address, authkey, serializer, writer):
511 '''
512 Create a server, report its address and run it
513 '''
514 # create server
515 server = cls._Server(registry, address, authkey, serializer)
516
517 # inform parent process of the server's address
518 writer.send(server.address)
519 writer.close()
520
521 # run the manager
522 util.info('manager serving at %r', server.address)
523 server.serve_forever()
524
525 def _create(self, typeid, *args, **kwds):
526 '''
527 Create a new shared object; return the token and exposed tuple
528 '''
529 assert self._state.value == State.STARTED, 'server not yet started'
530 conn = self._Client(self._address, authkey=self._authkey)
531 try:
532 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
533 finally:
534 conn.close()
535 return Token(typeid, self._address, id), exposed
536
537 def join(self, timeout=None):
538 '''
539 Join the manager process (if it has been spawned)
540 '''
541 self._process.join(timeout)
542
543 def _debug_info(self):
544 '''
545 Return some info about the servers shared objects and connections
546 '''
547 conn = self._Client(self._address, authkey=self._authkey)
548 try:
549 return dispatch(conn, None, 'debug_info')
550 finally:
551 conn.close()
552
553 def _number_of_objects(self):
554 '''
555 Return the number of shared objects
556 '''
557 conn = self._Client(self._address, authkey=self._authkey)
558 try:
559 return dispatch(conn, None, 'number_of_objects')
560 finally:
561 conn.close()
562
563 def __enter__(self):
564 return self
565
566 def __exit__(self, exc_type, exc_val, exc_tb):
567 self.shutdown()
568
569 @staticmethod
570 def _finalize_manager(process, address, authkey, state, _Client):
571 '''
572 Shutdown the manager process; will be registered as a finalizer
573 '''
574 if process.is_alive():
575 util.info('sending shutdown message to manager')
576 try:
577 conn = _Client(address, authkey=authkey)
578 try:
579 dispatch(conn, None, 'shutdown')
580 finally:
581 conn.close()
582 except Exception:
583 pass
584
585 process.join(timeout=0.2)
586 if process.is_alive():
587 util.info('manager still alive')
588 if hasattr(process, 'terminate'):
589 util.info('trying to `terminate()` manager process')
590 process.terminate()
591 process.join(timeout=0.1)
592 if process.is_alive():
593 util.info('manager still alive after terminate')
594
595 state.value = State.SHUTDOWN
596 try:
597 del BaseProxy._address_to_local[address]
598 except KeyError:
599 pass
600
601 address = property(lambda self: self._address)
602
603 @classmethod
604 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
605 method_to_typeid=None, create_method=True):
606 '''
607 Register a typeid with the manager type
608 '''
609 if '_registry' not in cls.__dict__:
610 cls._registry = cls._registry.copy()
611
612 if proxytype is None:
613 proxytype = AutoProxy
614
615 exposed = exposed or getattr(proxytype, '_exposed_', None)
616
617 method_to_typeid = method_to_typeid or \
618 getattr(proxytype, '_method_to_typeid_', None)
619
620 if method_to_typeid:
621 for key, value in method_to_typeid.items():
622 assert type(key) is str, '%r is not a string' % key
623 assert type(value) is str, '%r is not a string' % value
624
625 cls._registry[typeid] = (
626 callable, exposed, method_to_typeid, proxytype
627 )
628
629 if create_method:
630 def temp(self, *args, **kwds):
631 util.debug('requesting creation of a shared %r object', typeid)
632 token, exp = self._create(typeid, *args, **kwds)
633 proxy = proxytype(
634 token, self._serializer, manager=self,
635 authkey=self._authkey, exposed=exp
636 )
637 return proxy
638 temp.__name__ = typeid
639 setattr(cls, typeid, temp)
640
641#
642# Subclass of set which get cleared after a fork
643#
644
645class ProcessLocalSet(set):
646 def __init__(self):
647 util.register_after_fork(self, lambda obj: obj.clear())
648 def __reduce__(self):
649 return type(self), ()
650
651#
652# Definition of BaseProxy
653#
654
655class BaseProxy(object):
656 '''
657 A base for proxies of shared objects
658 '''
659 _address_to_local = {}
660 _mutex = util.ForkAwareThreadLock()
661
662 def __init__(self, token, serializer, manager=None,
663 authkey=None, exposed=None, incref=True):
664 BaseProxy._mutex.acquire()
665 try:
666 tls_idset = BaseProxy._address_to_local.get(token.address, None)
667 if tls_idset is None:
668 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
669 BaseProxy._address_to_local[token.address] = tls_idset
670 finally:
671 BaseProxy._mutex.release()
672
673 # self._tls is used to record the connection used by this
674 # thread to communicate with the manager at token.address
675 self._tls = tls_idset[0]
676
677 # self._idset is used to record the identities of all shared
678 # objects for which the current process owns references and
679 # which are in the manager at token.address
680 self._idset = tls_idset[1]
681
682 self._token = token
683 self._id = self._token.id
684 self._manager = manager
685 self._serializer = serializer
686 self._Client = listener_client[serializer][1]
687
688 if authkey is not None:
689 self._authkey = AuthenticationString(authkey)
690 elif self._manager is not None:
691 self._authkey = self._manager._authkey
692 else:
693 self._authkey = current_process().get_authkey()
694
695 if incref:
696 self._incref()
697
698 util.register_after_fork(self, BaseProxy._after_fork)
699
700 def _connect(self):
701 util.debug('making connection to manager')
702 name = current_process().get_name()
703 if threading.current_thread().get_name() != 'MainThread':
704 name += '|' + threading.current_thread().get_name()
705 conn = self._Client(self._token.address, authkey=self._authkey)
706 dispatch(conn, None, 'accept_connection', (name,))
707 self._tls.connection = conn
708
709 def _callmethod(self, methodname, args=(), kwds={}):
710 '''
711 Try to call a method of the referrent and return a copy of the result
712 '''
713 try:
714 conn = self._tls.connection
715 except AttributeError:
716 util.debug('thread %r does not own a connection',
717 threading.current_thread().get_name())
718 self._connect()
719 conn = self._tls.connection
720
721 conn.send((self._id, methodname, args, kwds))
722 kind, result = conn.recv()
723
724 if kind == '#RETURN':
725 return result
726 elif kind == '#PROXY':
727 exposed, token = result
728 proxytype = self._manager._registry[token.typeid][-1]
729 return proxytype(
730 token, self._serializer, manager=self._manager,
731 authkey=self._authkey, exposed=exposed
732 )
733 raise convert_to_error(kind, result)
734
735 def _getvalue(self):
736 '''
737 Get a copy of the value of the referent
738 '''
739 return self._callmethod('#GETVALUE')
740
741 def _incref(self):
742 conn = self._Client(self._token.address, authkey=self._authkey)
743 dispatch(conn, None, 'incref', (self._id,))
744 util.debug('INCREF %r', self._token.id)
745
746 self._idset.add(self._id)
747
748 state = self._manager and self._manager._state
749
750 self._close = util.Finalize(
751 self, BaseProxy._decref,
752 args=(self._token, self._authkey, state,
753 self._tls, self._idset, self._Client),
754 exitpriority=10
755 )
756
757 @staticmethod
758 def _decref(token, authkey, state, tls, idset, _Client):
759 idset.discard(token.id)
760
761 # check whether manager is still alive
762 if state is None or state.value == State.STARTED:
763 # tell manager this process no longer cares about referent
764 try:
765 util.debug('DECREF %r', token.id)
766 conn = _Client(token.address, authkey=authkey)
767 dispatch(conn, None, 'decref', (token.id,))
768 except Exception, e:
769 util.debug('... decref failed %s', e)
770
771 else:
772 util.debug('DECREF %r -- manager already shutdown', token.id)
773
774 # check whether we can close this thread's connection because
775 # the process owns no more references to objects for this manager
776 if not idset and hasattr(tls, 'connection'):
777 util.debug('thread %r has no more proxies so closing conn',
778 threading.current_thread().get_name())
779 tls.connection.close()
780 del tls.connection
781
782 def _after_fork(self):
783 self._manager = None
784 try:
785 self._incref()
786 except Exception, e:
787 # the proxy may just be for a manager which has shutdown
788 util.info('incref failed: %s' % e)
789
790 def __reduce__(self):
791 kwds = {}
792 if Popen.thread_is_spawning():
793 kwds['authkey'] = self._authkey
794
795 if getattr(self, '_isauto', False):
796 kwds['exposed'] = self._exposed_
797 return (RebuildProxy,
798 (AutoProxy, self._token, self._serializer, kwds))
799 else:
800 return (RebuildProxy,
801 (type(self), self._token, self._serializer, kwds))
802
803 def __deepcopy__(self, memo):
804 return self._getvalue()
805
806 def __repr__(self):
807 return '<%s object, typeid %r at %s>' % \
808 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
809
810 def __str__(self):
811 '''
812 Return representation of the referent (or a fall-back if that fails)
813 '''
814 try:
815 return self._callmethod('__repr__')
816 except Exception:
817 return repr(self)[:-1] + "; '__str__()' failed>"
818
819#
820# Function used for unpickling
821#
822
823def RebuildProxy(func, token, serializer, kwds):
824 '''
825 Function used for unpickling proxy objects.
826
827 If possible the shared object is returned, or otherwise a proxy for it.
828 '''
829 server = getattr(current_process(), '_manager_server', None)
830
831 if server and server.address == token.address:
832 return server.id_to_obj[token.id][0]
833 else:
834 incref = (
835 kwds.pop('incref', True) and
836 not getattr(current_process(), '_inheriting', False)
837 )
838 return func(token, serializer, incref=incref, **kwds)
839
840#
841# Functions to create proxies and proxy types
842#
843
844def MakeProxyType(name, exposed, _cache={}):
845 '''
846 Return an proxy type whose methods are given by `exposed`
847 '''
848 exposed = tuple(exposed)
849 try:
850 return _cache[(name, exposed)]
851 except KeyError:
852 pass
853
854 dic = {}
855
856 for meth in exposed:
857 exec '''def %s(self, *args, **kwds):
858 return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
859
860 ProxyType = type(name, (BaseProxy,), dic)
861 ProxyType._exposed_ = exposed
862 _cache[(name, exposed)] = ProxyType
863 return ProxyType
864
865
866def AutoProxy(token, serializer, manager=None, authkey=None,
867 exposed=None, incref=True):
868 '''
869 Return an auto-proxy for `token`
870 '''
871 _Client = listener_client[serializer][1]
872
873 if exposed is None:
874 conn = _Client(token.address, authkey=authkey)
875 try:
876 exposed = dispatch(conn, None, 'get_methods', (token,))
877 finally:
878 conn.close()
879
880 if authkey is None and manager is not None:
881 authkey = manager._authkey
882 if authkey is None:
883 authkey = current_process().get_authkey()
884
885 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
886 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
887 incref=incref)
888 proxy._isauto = True
889 return proxy
890
891#
892# Types/callables which we will register with SyncManager
893#
894
895class Namespace(object):
896 def __init__(self, **kwds):
897 self.__dict__.update(kwds)
898 def __repr__(self):
899 items = self.__dict__.items()
900 temp = []
901 for name, value in items:
902 if not name.startswith('_'):
903 temp.append('%s=%r' % (name, value))
904 temp.sort()
905 return 'Namespace(%s)' % str.join(', ', temp)
906
907class Value(object):
908 def __init__(self, typecode, value, lock=True):
909 self._typecode = typecode
910 self._value = value
911 def get(self):
912 return self._value
913 def set(self, value):
914 self._value = value
915 def __repr__(self):
916 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
917 value = property(get, set)
918
919def Array(typecode, sequence, lock=True):
920 return array.array(typecode, sequence)
921
922#
923# Proxy types used by SyncManager
924#
925
926class IteratorProxy(BaseProxy):
927 # XXX remove methods for Py3.0 and Py2.6
928 _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
929 def __iter__(self):
930 return self
931 def __next__(self, *args):
932 return self._callmethod('__next__', args)
933 def next(self, *args):
934 return self._callmethod('next', args)
935 def send(self, *args):
936 return self._callmethod('send', args)
937 def throw(self, *args):
938 return self._callmethod('throw', args)
939 def close(self, *args):
940 return self._callmethod('close', args)
941
942
943class AcquirerProxy(BaseProxy):
944 _exposed_ = ('acquire', 'release')
945 def acquire(self, blocking=True):
946 return self._callmethod('acquire', (blocking,))
947 def release(self):
948 return self._callmethod('release')
949 def __enter__(self):
950 return self._callmethod('acquire')
951 def __exit__(self, exc_type, exc_val, exc_tb):
952 return self._callmethod('release')
953
954
955class ConditionProxy(AcquirerProxy):
956 # XXX will Condition.notfyAll() name be available in Py3.0?
957 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
958 def wait(self, timeout=None):
959 return self._callmethod('wait', (timeout,))
960 def notify(self):
961 return self._callmethod('notify')
962 def notify_all(self):
963 return self._callmethod('notify_all')
964
965class EventProxy(BaseProxy):
Benjamin Peterson80821f72008-06-26 21:29:19 +0000966 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000967 def is_set(self):
Benjamin Peterson0adfd932008-06-26 21:24:35 +0000968 return self._callmethod('is_set')
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000969 def set(self):
970 return self._callmethod('set')
971 def clear(self):
972 return self._callmethod('clear')
973 def wait(self, timeout=None):
974 return self._callmethod('wait', (timeout,))
975
976class NamespaceProxy(BaseProxy):
977 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
978 def __getattr__(self, key):
979 if key[0] == '_':
980 return object.__getattribute__(self, key)
981 callmethod = object.__getattribute__(self, '_callmethod')
982 return callmethod('__getattribute__', (key,))
983 def __setattr__(self, key, value):
984 if key[0] == '_':
985 return object.__setattr__(self, key, value)
986 callmethod = object.__getattribute__(self, '_callmethod')
987 return callmethod('__setattr__', (key, value))
988 def __delattr__(self, key):
989 if key[0] == '_':
990 return object.__delattr__(self, key)
991 callmethod = object.__getattribute__(self, '_callmethod')
992 return callmethod('__delattr__', (key,))
993
994
995class ValueProxy(BaseProxy):
996 _exposed_ = ('get', 'set')
997 def get(self):
998 return self._callmethod('get')
999 def set(self, value):
1000 return self._callmethod('set', (value,))
1001 value = property(get, set)
1002
1003
1004BaseListProxy = MakeProxyType('BaseListProxy', (
1005 '__add__', '__contains__', '__delitem__', '__delslice__',
1006 '__getitem__', '__getslice__', '__len__', '__mul__',
1007 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1008 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1009 'reverse', 'sort', '__imul__'
1010 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1011class ListProxy(BaseListProxy):
1012 def __iadd__(self, value):
1013 self._callmethod('extend', (value,))
1014 return self
1015 def __imul__(self, value):
1016 self._callmethod('__imul__', (value,))
1017 return self
1018
1019
1020DictProxy = MakeProxyType('DictProxy', (
1021 '__contains__', '__delitem__', '__getitem__', '__len__',
1022 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1023 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1024 ))
1025
1026
1027ArrayProxy = MakeProxyType('ArrayProxy', (
1028 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1029 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1030
1031
1032PoolProxy = MakeProxyType('PoolProxy', (
1033 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1034 'map', 'map_async', 'terminate'
1035 ))
1036PoolProxy._method_to_typeid_ = {
1037 'apply_async': 'AsyncResult',
1038 'map_async': 'AsyncResult',
1039 'imap': 'Iterator',
1040 'imap_unordered': 'Iterator'
1041 }
1042
1043#
1044# Definition of SyncManager
1045#
1046
1047class SyncManager(BaseManager):
1048 '''
1049 Subclass of `BaseManager` which supports a number of shared object types.
1050
1051 The types registered are those intended for the synchronization
1052 of threads, plus `dict`, `list` and `Namespace`.
1053
1054 The `multiprocessing.Manager()` function creates started instances of
1055 this class.
1056 '''
1057
1058SyncManager.register('Queue', Queue.Queue)
1059SyncManager.register('JoinableQueue', Queue.Queue)
1060SyncManager.register('Event', threading.Event, EventProxy)
1061SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1062SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1063SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1064SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1065 AcquirerProxy)
1066SyncManager.register('Condition', threading.Condition, ConditionProxy)
1067SyncManager.register('Pool', Pool, PoolProxy)
1068SyncManager.register('list', list, ListProxy)
1069SyncManager.register('dict', dict, DictProxy)
1070SyncManager.register('Value', Value, ValueProxy)
1071SyncManager.register('Array', Array, ArrayProxy)
1072SyncManager.register('Namespace', Namespace, NamespaceProxy)
1073
1074# types returned by methods of PoolProxy
1075SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1076SyncManager.register('AsyncResult', create_method=False)