blob: 8850258ce58a6e443c942ad3167572b43aa03f12 [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8#
9
10__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11
12#
13# Imports
14#
15
16import os
17import sys
18import weakref
19import threading
20import array
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000021import Queue
22
23from traceback import format_exc
24from multiprocessing import Process, current_process, active_children, Pool, util, connection
25from multiprocessing.process import AuthenticationString
Jesse Noller13e9d582008-07-16 14:32:36 +000026from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000027from multiprocessing.util import Finalize, info
28
29try:
30 from cPickle import PicklingError
31except ImportError:
32 from pickle import PicklingError
33
34#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000035# Register some things for pickling
36#
37
38def reduce_array(a):
39 return array.array, (a.typecode, a.tostring())
Jesse Noller13e9d582008-07-16 14:32:36 +000040ForkingPickler.register(array.array, reduce_array)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000041
42view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000043
44#
45# Type for identifying shared objects
46#
47
48class Token(object):
49 '''
50 Type to uniquely indentify a shared object
51 '''
52 __slots__ = ('typeid', 'address', 'id')
53
54 def __init__(self, typeid, address, id):
55 (self.typeid, self.address, self.id) = (typeid, address, id)
56
57 def __getstate__(self):
58 return (self.typeid, self.address, self.id)
59
60 def __setstate__(self, state):
61 (self.typeid, self.address, self.id) = state
62
63 def __repr__(self):
64 return 'Token(typeid=%r, address=%r, id=%r)' % \
65 (self.typeid, self.address, self.id)
66
67#
68# Function for communication with a manager's server process
69#
70
71def dispatch(c, id, methodname, args=(), kwds={}):
72 '''
73 Send a message to manager using connection `c` and return response
74 '''
75 c.send((id, methodname, args, kwds))
76 kind, result = c.recv()
77 if kind == '#RETURN':
78 return result
79 raise convert_to_error(kind, result)
80
81def convert_to_error(kind, result):
82 if kind == '#ERROR':
83 return result
84 elif kind == '#TRACEBACK':
85 assert type(result) is str
86 return RemoteError(result)
87 elif kind == '#UNSERIALIZABLE':
88 assert type(result) is str
89 return RemoteError('Unserializable message: %s\n' % result)
90 else:
91 return ValueError('Unrecognized message type')
92
93class RemoteError(Exception):
94 def __str__(self):
95 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
96
97#
98# Functions for finding the method names of an object
99#
100
101def all_methods(obj):
102 '''
103 Return a list of names of methods of `obj`
104 '''
105 temp = []
106 for name in dir(obj):
107 func = getattr(obj, name)
108 if hasattr(func, '__call__'):
109 temp.append(name)
110 return temp
111
112def public_methods(obj):
113 '''
114 Return a list of names of methods of `obj` which do not start with '_'
115 '''
116 return [name for name in all_methods(obj) if name[0] != '_']
117
118#
119# Server which is run in a process controlled by a manager
120#
121
122class Server(object):
123 '''
124 Server class which runs in a process controlled by a manager object
125 '''
126 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
127 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
128
129 def __init__(self, registry, address, authkey, serializer):
130 assert isinstance(authkey, bytes)
131 self.registry = registry
132 self.authkey = AuthenticationString(authkey)
133 Listener, Client = listener_client[serializer]
134
135 # do authentication later
136 self.listener = Listener(address=address, backlog=5)
137 self.address = self.listener.address
138
Jesse Noller7314b382009-01-21 02:08:17 +0000139 self.id_to_obj = {'0': (None, ())}
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000140 self.id_to_refcount = {}
141 self.mutex = threading.RLock()
142 self.stop = 0
143
144 def serve_forever(self):
145 '''
146 Run the server forever
147 '''
148 current_process()._manager_server = self
149 try:
150 try:
151 while 1:
152 try:
153 c = self.listener.accept()
154 except (OSError, IOError):
155 continue
156 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Peterson82aa2012008-08-18 18:31:58 +0000157 t.daemon = True
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000158 t.start()
159 except (KeyboardInterrupt, SystemExit):
160 pass
161 finally:
162 self.stop = 999
163 self.listener.close()
164
165 def handle_request(self, c):
166 '''
167 Handle a new connection
168 '''
169 funcname = result = request = None
170 try:
171 connection.deliver_challenge(c, self.authkey)
172 connection.answer_challenge(c, self.authkey)
173 request = c.recv()
174 ignore, funcname, args, kwds = request
175 assert funcname in self.public, '%r unrecognized' % funcname
176 func = getattr(self, funcname)
177 except Exception:
178 msg = ('#TRACEBACK', format_exc())
179 else:
180 try:
181 result = func(c, *args, **kwds)
182 except Exception:
183 msg = ('#TRACEBACK', format_exc())
184 else:
185 msg = ('#RETURN', result)
186 try:
187 c.send(msg)
188 except Exception, e:
189 try:
190 c.send(('#TRACEBACK', format_exc()))
191 except Exception:
192 pass
193 util.info('Failure to send message: %r', msg)
194 util.info(' ... request was %r', request)
195 util.info(' ... exception was %r', e)
196
197 c.close()
198
199 def serve_client(self, conn):
200 '''
201 Handle requests from the proxies in a particular process/thread
202 '''
203 util.debug('starting server thread to service %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000204 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000205
206 recv = conn.recv
207 send = conn.send
208 id_to_obj = self.id_to_obj
209
210 while not self.stop:
211
212 try:
213 methodname = obj = None
214 request = recv()
215 ident, methodname, args, kwds = request
216 obj, exposed, gettypeid = id_to_obj[ident]
217
218 if methodname not in exposed:
219 raise AttributeError(
220 'method %r of %r object is not in exposed=%r' %
221 (methodname, type(obj), exposed)
222 )
223
224 function = getattr(obj, methodname)
225
226 try:
227 res = function(*args, **kwds)
228 except Exception, e:
229 msg = ('#ERROR', e)
230 else:
231 typeid = gettypeid and gettypeid.get(methodname, None)
232 if typeid:
233 rident, rexposed = self.create(conn, typeid, res)
234 token = Token(typeid, self.address, rident)
235 msg = ('#PROXY', (rexposed, token))
236 else:
237 msg = ('#RETURN', res)
238
239 except AttributeError:
240 if methodname is None:
241 msg = ('#TRACEBACK', format_exc())
242 else:
243 try:
244 fallback_func = self.fallback_mapping[methodname]
245 result = fallback_func(
246 self, conn, ident, obj, *args, **kwds
247 )
248 msg = ('#RETURN', result)
249 except Exception:
250 msg = ('#TRACEBACK', format_exc())
251
252 except EOFError:
253 util.debug('got EOF -- exiting thread serving %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000254 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000255 sys.exit(0)
256
257 except Exception:
258 msg = ('#TRACEBACK', format_exc())
259
260 try:
261 try:
262 send(msg)
263 except Exception, e:
264 send(('#UNSERIALIZABLE', repr(msg)))
265 except Exception, e:
266 util.info('exception in thread serving %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000267 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000268 util.info(' ... message was %r', msg)
269 util.info(' ... exception was %r', e)
270 conn.close()
271 sys.exit(1)
272
273 def fallback_getvalue(self, conn, ident, obj):
274 return obj
275
276 def fallback_str(self, conn, ident, obj):
277 return str(obj)
278
279 def fallback_repr(self, conn, ident, obj):
280 return repr(obj)
281
282 fallback_mapping = {
283 '__str__':fallback_str,
284 '__repr__':fallback_repr,
285 '#GETVALUE':fallback_getvalue
286 }
287
288 def dummy(self, c):
289 pass
290
291 def debug_info(self, c):
292 '''
293 Return some info --- useful to spot problems with refcounting
294 '''
295 self.mutex.acquire()
296 try:
297 result = []
298 keys = self.id_to_obj.keys()
299 keys.sort()
300 for ident in keys:
Jesse Noller7314b382009-01-21 02:08:17 +0000301 if ident != '0':
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000302 result.append(' %s: refcount=%s\n %s' %
303 (ident, self.id_to_refcount[ident],
304 str(self.id_to_obj[ident][0])[:75]))
305 return '\n'.join(result)
306 finally:
307 self.mutex.release()
308
309 def number_of_objects(self, c):
310 '''
311 Number of shared objects
312 '''
Jesse Noller7314b382009-01-21 02:08:17 +0000313 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000314
315 def shutdown(self, c):
316 '''
317 Shutdown this process
318 '''
319 try:
320 try:
321 util.debug('manager received shutdown message')
322 c.send(('#RETURN', None))
323
324 if sys.stdout != sys.__stdout__:
325 util.debug('resetting stdout, stderr')
326 sys.stdout = sys.__stdout__
327 sys.stderr = sys.__stderr__
328
329 util._run_finalizers(0)
330
331 for p in active_children():
332 util.debug('terminating a child process of manager')
333 p.terminate()
334
335 for p in active_children():
336 util.debug('terminating a child process of manager')
337 p.join()
338
339 util._run_finalizers()
340 util.info('manager exiting with exitcode 0')
341 except:
342 import traceback
343 traceback.print_exc()
344 finally:
345 exit(0)
346
347 def create(self, c, typeid, *args, **kwds):
348 '''
349 Create a new shared object and return its id
350 '''
351 self.mutex.acquire()
352 try:
353 callable, exposed, method_to_typeid, proxytype = \
354 self.registry[typeid]
355
356 if callable is None:
357 assert len(args) == 1 and not kwds
358 obj = args[0]
359 else:
360 obj = callable(*args, **kwds)
361
362 if exposed is None:
363 exposed = public_methods(obj)
364 if method_to_typeid is not None:
365 assert type(method_to_typeid) is dict
366 exposed = list(exposed) + list(method_to_typeid)
367
368 ident = '%x' % id(obj) # convert to string because xmlrpclib
369 # only has 32 bit signed integers
370 util.debug('%r callable returned object with id %r', typeid, ident)
371
372 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
373 if ident not in self.id_to_refcount:
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000374 self.id_to_refcount[ident] = 0
375 # increment the reference count immediately, to avoid
376 # this object being garbage collected before a Proxy
377 # object for it can be created. The caller of create()
378 # is responsible for doing a decref once the Proxy object
379 # has been created.
380 self.incref(c, ident)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000381 return ident, tuple(exposed)
382 finally:
383 self.mutex.release()
384
385 def get_methods(self, c, token):
386 '''
387 Return the methods of the shared object indicated by token
388 '''
389 return tuple(self.id_to_obj[token.id][1])
390
391 def accept_connection(self, c, name):
392 '''
393 Spawn a new thread to serve this connection
394 '''
Benjamin Petersona9b22222008-08-18 18:01:43 +0000395 threading.current_thread().name = name
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000396 c.send(('#RETURN', None))
397 self.serve_client(c)
398
399 def incref(self, c, ident):
400 self.mutex.acquire()
401 try:
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000402 self.id_to_refcount[ident] += 1
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000403 finally:
404 self.mutex.release()
405
406 def decref(self, c, ident):
407 self.mutex.acquire()
408 try:
409 assert self.id_to_refcount[ident] >= 1
410 self.id_to_refcount[ident] -= 1
411 if self.id_to_refcount[ident] == 0:
412 del self.id_to_obj[ident], self.id_to_refcount[ident]
Georg Brandlc40e60e2009-09-18 09:18:27 +0000413 util.debug('disposing of obj with id %r', ident)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000414 finally:
415 self.mutex.release()
416
417#
418# Class to represent state of a manager
419#
420
421class State(object):
422 __slots__ = ['value']
423 INITIAL = 0
424 STARTED = 1
425 SHUTDOWN = 2
426
427#
428# Mapping from serializer name to Listener and Client types
429#
430
431listener_client = {
432 'pickle' : (connection.Listener, connection.Client),
433 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
434 }
435
436#
437# Definition of BaseManager
438#
439
440class BaseManager(object):
441 '''
442 Base class for managers
443 '''
444 _registry = {}
445 _Server = Server
446
447 def __init__(self, address=None, authkey=None, serializer='pickle'):
448 if authkey is None:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000449 authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000450 self._address = address # XXX not final address if eg ('', 0)
451 self._authkey = AuthenticationString(authkey)
452 self._state = State()
453 self._state.value = State.INITIAL
454 self._serializer = serializer
455 self._Listener, self._Client = listener_client[serializer]
456
457 def __reduce__(self):
458 return type(self).from_address, \
459 (self._address, self._authkey, self._serializer)
460
461 def get_server(self):
462 '''
463 Return server object with serve_forever() method and address attribute
464 '''
465 assert self._state.value == State.INITIAL
466 return Server(self._registry, self._address,
467 self._authkey, self._serializer)
468
469 def connect(self):
470 '''
471 Connect manager object to the server process
472 '''
473 Listener, Client = listener_client[self._serializer]
474 conn = Client(self._address, authkey=self._authkey)
475 dispatch(conn, None, 'dummy')
476 self._state.value = State.STARTED
477
Jesse Noller7152f6d2009-04-02 05:17:26 +0000478 def start(self, initializer=None, initargs=()):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000479 '''
480 Spawn a server process for this manager object
481 '''
482 assert self._state.value == State.INITIAL
483
Jesse Noller7152f6d2009-04-02 05:17:26 +0000484 if initializer is not None and not hasattr(initializer, '__call__'):
485 raise TypeError('initializer must be a callable')
486
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000487 # pipe over which we will retrieve address of server
488 reader, writer = connection.Pipe(duplex=False)
489
490 # spawn process which runs a server
491 self._process = Process(
492 target=type(self)._run_server,
493 args=(self._registry, self._address, self._authkey,
Jesse Noller7152f6d2009-04-02 05:17:26 +0000494 self._serializer, writer, initializer, initargs),
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000495 )
496 ident = ':'.join(str(i) for i in self._process._identity)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000497 self._process.name = type(self).__name__ + '-' + ident
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000498 self._process.start()
499
500 # get address of server
501 writer.close()
502 self._address = reader.recv()
503 reader.close()
504
505 # register a finalizer
506 self._state.value = State.STARTED
507 self.shutdown = util.Finalize(
508 self, type(self)._finalize_manager,
509 args=(self._process, self._address, self._authkey,
510 self._state, self._Client),
511 exitpriority=0
512 )
513
514 @classmethod
Jesse Noller7152f6d2009-04-02 05:17:26 +0000515 def _run_server(cls, registry, address, authkey, serializer, writer,
516 initializer=None, initargs=()):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000517 '''
518 Create a server, report its address and run it
519 '''
Jesse Noller7152f6d2009-04-02 05:17:26 +0000520 if initializer is not None:
521 initializer(*initargs)
522
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000523 # create server
524 server = cls._Server(registry, address, authkey, serializer)
525
526 # inform parent process of the server's address
527 writer.send(server.address)
528 writer.close()
529
530 # run the manager
531 util.info('manager serving at %r', server.address)
532 server.serve_forever()
533
534 def _create(self, typeid, *args, **kwds):
535 '''
536 Create a new shared object; return the token and exposed tuple
537 '''
538 assert self._state.value == State.STARTED, 'server not yet started'
539 conn = self._Client(self._address, authkey=self._authkey)
540 try:
541 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
542 finally:
543 conn.close()
544 return Token(typeid, self._address, id), exposed
545
546 def join(self, timeout=None):
547 '''
548 Join the manager process (if it has been spawned)
549 '''
550 self._process.join(timeout)
551
552 def _debug_info(self):
553 '''
554 Return some info about the servers shared objects and connections
555 '''
556 conn = self._Client(self._address, authkey=self._authkey)
557 try:
558 return dispatch(conn, None, 'debug_info')
559 finally:
560 conn.close()
561
562 def _number_of_objects(self):
563 '''
564 Return the number of shared objects
565 '''
566 conn = self._Client(self._address, authkey=self._authkey)
567 try:
568 return dispatch(conn, None, 'number_of_objects')
569 finally:
570 conn.close()
571
572 def __enter__(self):
573 return self
574
575 def __exit__(self, exc_type, exc_val, exc_tb):
576 self.shutdown()
577
578 @staticmethod
579 def _finalize_manager(process, address, authkey, state, _Client):
580 '''
581 Shutdown the manager process; will be registered as a finalizer
582 '''
583 if process.is_alive():
584 util.info('sending shutdown message to manager')
585 try:
586 conn = _Client(address, authkey=authkey)
587 try:
588 dispatch(conn, None, 'shutdown')
589 finally:
590 conn.close()
591 except Exception:
592 pass
593
594 process.join(timeout=0.2)
595 if process.is_alive():
596 util.info('manager still alive')
597 if hasattr(process, 'terminate'):
598 util.info('trying to `terminate()` manager process')
599 process.terminate()
600 process.join(timeout=0.1)
601 if process.is_alive():
602 util.info('manager still alive after terminate')
603
604 state.value = State.SHUTDOWN
605 try:
606 del BaseProxy._address_to_local[address]
607 except KeyError:
608 pass
609
610 address = property(lambda self: self._address)
611
612 @classmethod
613 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
614 method_to_typeid=None, create_method=True):
615 '''
616 Register a typeid with the manager type
617 '''
618 if '_registry' not in cls.__dict__:
619 cls._registry = cls._registry.copy()
620
621 if proxytype is None:
622 proxytype = AutoProxy
623
624 exposed = exposed or getattr(proxytype, '_exposed_', None)
625
626 method_to_typeid = method_to_typeid or \
627 getattr(proxytype, '_method_to_typeid_', None)
628
629 if method_to_typeid:
630 for key, value in method_to_typeid.items():
631 assert type(key) is str, '%r is not a string' % key
632 assert type(value) is str, '%r is not a string' % value
633
634 cls._registry[typeid] = (
635 callable, exposed, method_to_typeid, proxytype
636 )
637
638 if create_method:
639 def temp(self, *args, **kwds):
640 util.debug('requesting creation of a shared %r object', typeid)
641 token, exp = self._create(typeid, *args, **kwds)
642 proxy = proxytype(
643 token, self._serializer, manager=self,
644 authkey=self._authkey, exposed=exp
645 )
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000646 conn = self._Client(token.address, authkey=self._authkey)
647 dispatch(conn, None, 'decref', (token.id,))
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000648 return proxy
649 temp.__name__ = typeid
650 setattr(cls, typeid, temp)
651
652#
653# Subclass of set which get cleared after a fork
654#
655
656class ProcessLocalSet(set):
657 def __init__(self):
658 util.register_after_fork(self, lambda obj: obj.clear())
659 def __reduce__(self):
660 return type(self), ()
661
662#
663# Definition of BaseProxy
664#
665
666class BaseProxy(object):
667 '''
668 A base for proxies of shared objects
669 '''
670 _address_to_local = {}
671 _mutex = util.ForkAwareThreadLock()
672
673 def __init__(self, token, serializer, manager=None,
674 authkey=None, exposed=None, incref=True):
675 BaseProxy._mutex.acquire()
676 try:
677 tls_idset = BaseProxy._address_to_local.get(token.address, None)
678 if tls_idset is None:
679 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
680 BaseProxy._address_to_local[token.address] = tls_idset
681 finally:
682 BaseProxy._mutex.release()
683
684 # self._tls is used to record the connection used by this
685 # thread to communicate with the manager at token.address
686 self._tls = tls_idset[0]
687
688 # self._idset is used to record the identities of all shared
689 # objects for which the current process owns references and
690 # which are in the manager at token.address
691 self._idset = tls_idset[1]
692
693 self._token = token
694 self._id = self._token.id
695 self._manager = manager
696 self._serializer = serializer
697 self._Client = listener_client[serializer][1]
698
699 if authkey is not None:
700 self._authkey = AuthenticationString(authkey)
701 elif self._manager is not None:
702 self._authkey = self._manager._authkey
703 else:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000704 self._authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000705
706 if incref:
707 self._incref()
708
709 util.register_after_fork(self, BaseProxy._after_fork)
710
711 def _connect(self):
712 util.debug('making connection to manager')
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000713 name = current_process().name
Benjamin Petersona9b22222008-08-18 18:01:43 +0000714 if threading.current_thread().name != 'MainThread':
715 name += '|' + threading.current_thread().name
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000716 conn = self._Client(self._token.address, authkey=self._authkey)
717 dispatch(conn, None, 'accept_connection', (name,))
718 self._tls.connection = conn
719
720 def _callmethod(self, methodname, args=(), kwds={}):
721 '''
722 Try to call a method of the referrent and return a copy of the result
723 '''
724 try:
725 conn = self._tls.connection
726 except AttributeError:
727 util.debug('thread %r does not own a connection',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000728 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000729 self._connect()
730 conn = self._tls.connection
731
732 conn.send((self._id, methodname, args, kwds))
733 kind, result = conn.recv()
734
735 if kind == '#RETURN':
736 return result
737 elif kind == '#PROXY':
738 exposed, token = result
739 proxytype = self._manager._registry[token.typeid][-1]
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000740 proxy = proxytype(
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000741 token, self._serializer, manager=self._manager,
742 authkey=self._authkey, exposed=exposed
743 )
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000744 conn = self._Client(token.address, authkey=self._authkey)
745 dispatch(conn, None, 'decref', (token.id,))
746 return proxy
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000747 raise convert_to_error(kind, result)
748
749 def _getvalue(self):
750 '''
751 Get a copy of the value of the referent
752 '''
753 return self._callmethod('#GETVALUE')
754
755 def _incref(self):
756 conn = self._Client(self._token.address, authkey=self._authkey)
757 dispatch(conn, None, 'incref', (self._id,))
758 util.debug('INCREF %r', self._token.id)
759
760 self._idset.add(self._id)
761
762 state = self._manager and self._manager._state
763
764 self._close = util.Finalize(
765 self, BaseProxy._decref,
766 args=(self._token, self._authkey, state,
767 self._tls, self._idset, self._Client),
768 exitpriority=10
769 )
770
771 @staticmethod
772 def _decref(token, authkey, state, tls, idset, _Client):
773 idset.discard(token.id)
774
775 # check whether manager is still alive
776 if state is None or state.value == State.STARTED:
777 # tell manager this process no longer cares about referent
778 try:
779 util.debug('DECREF %r', token.id)
780 conn = _Client(token.address, authkey=authkey)
781 dispatch(conn, None, 'decref', (token.id,))
782 except Exception, e:
783 util.debug('... decref failed %s', e)
784
785 else:
786 util.debug('DECREF %r -- manager already shutdown', token.id)
787
788 # check whether we can close this thread's connection because
789 # the process owns no more references to objects for this manager
790 if not idset and hasattr(tls, 'connection'):
791 util.debug('thread %r has no more proxies so closing conn',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000792 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000793 tls.connection.close()
794 del tls.connection
795
796 def _after_fork(self):
797 self._manager = None
798 try:
799 self._incref()
800 except Exception, e:
801 # the proxy may just be for a manager which has shutdown
802 util.info('incref failed: %s' % e)
803
804 def __reduce__(self):
805 kwds = {}
806 if Popen.thread_is_spawning():
807 kwds['authkey'] = self._authkey
808
809 if getattr(self, '_isauto', False):
810 kwds['exposed'] = self._exposed_
811 return (RebuildProxy,
812 (AutoProxy, self._token, self._serializer, kwds))
813 else:
814 return (RebuildProxy,
815 (type(self), self._token, self._serializer, kwds))
816
817 def __deepcopy__(self, memo):
818 return self._getvalue()
819
820 def __repr__(self):
821 return '<%s object, typeid %r at %s>' % \
822 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
823
824 def __str__(self):
825 '''
826 Return representation of the referent (or a fall-back if that fails)
827 '''
828 try:
829 return self._callmethod('__repr__')
830 except Exception:
831 return repr(self)[:-1] + "; '__str__()' failed>"
832
833#
834# Function used for unpickling
835#
836
837def RebuildProxy(func, token, serializer, kwds):
838 '''
839 Function used for unpickling proxy objects.
840
841 If possible the shared object is returned, or otherwise a proxy for it.
842 '''
843 server = getattr(current_process(), '_manager_server', None)
844
845 if server and server.address == token.address:
846 return server.id_to_obj[token.id][0]
847 else:
848 incref = (
849 kwds.pop('incref', True) and
850 not getattr(current_process(), '_inheriting', False)
851 )
852 return func(token, serializer, incref=incref, **kwds)
853
854#
855# Functions to create proxies and proxy types
856#
857
858def MakeProxyType(name, exposed, _cache={}):
859 '''
860 Return an proxy type whose methods are given by `exposed`
861 '''
862 exposed = tuple(exposed)
863 try:
864 return _cache[(name, exposed)]
865 except KeyError:
866 pass
867
868 dic = {}
869
870 for meth in exposed:
871 exec '''def %s(self, *args, **kwds):
872 return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
873
874 ProxyType = type(name, (BaseProxy,), dic)
875 ProxyType._exposed_ = exposed
876 _cache[(name, exposed)] = ProxyType
877 return ProxyType
878
879
880def AutoProxy(token, serializer, manager=None, authkey=None,
881 exposed=None, incref=True):
882 '''
883 Return an auto-proxy for `token`
884 '''
885 _Client = listener_client[serializer][1]
886
887 if exposed is None:
888 conn = _Client(token.address, authkey=authkey)
889 try:
890 exposed = dispatch(conn, None, 'get_methods', (token,))
891 finally:
892 conn.close()
893
894 if authkey is None and manager is not None:
895 authkey = manager._authkey
896 if authkey is None:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000897 authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000898
899 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
900 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
901 incref=incref)
902 proxy._isauto = True
903 return proxy
904
905#
906# Types/callables which we will register with SyncManager
907#
908
909class Namespace(object):
910 def __init__(self, **kwds):
911 self.__dict__.update(kwds)
912 def __repr__(self):
913 items = self.__dict__.items()
914 temp = []
915 for name, value in items:
916 if not name.startswith('_'):
917 temp.append('%s=%r' % (name, value))
918 temp.sort()
919 return 'Namespace(%s)' % str.join(', ', temp)
920
921class Value(object):
922 def __init__(self, typecode, value, lock=True):
923 self._typecode = typecode
924 self._value = value
925 def get(self):
926 return self._value
927 def set(self, value):
928 self._value = value
929 def __repr__(self):
930 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
931 value = property(get, set)
932
933def Array(typecode, sequence, lock=True):
934 return array.array(typecode, sequence)
935
936#
937# Proxy types used by SyncManager
938#
939
940class IteratorProxy(BaseProxy):
941 # XXX remove methods for Py3.0 and Py2.6
942 _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
943 def __iter__(self):
944 return self
945 def __next__(self, *args):
946 return self._callmethod('__next__', args)
947 def next(self, *args):
948 return self._callmethod('next', args)
949 def send(self, *args):
950 return self._callmethod('send', args)
951 def throw(self, *args):
952 return self._callmethod('throw', args)
953 def close(self, *args):
954 return self._callmethod('close', args)
955
956
957class AcquirerProxy(BaseProxy):
958 _exposed_ = ('acquire', 'release')
959 def acquire(self, blocking=True):
960 return self._callmethod('acquire', (blocking,))
961 def release(self):
962 return self._callmethod('release')
963 def __enter__(self):
964 return self._callmethod('acquire')
965 def __exit__(self, exc_type, exc_val, exc_tb):
966 return self._callmethod('release')
967
968
969class ConditionProxy(AcquirerProxy):
970 # XXX will Condition.notfyAll() name be available in Py3.0?
971 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
972 def wait(self, timeout=None):
973 return self._callmethod('wait', (timeout,))
974 def notify(self):
975 return self._callmethod('notify')
976 def notify_all(self):
977 return self._callmethod('notify_all')
978
979class EventProxy(BaseProxy):
Benjamin Peterson80821f72008-06-26 21:29:19 +0000980 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000981 def is_set(self):
Benjamin Peterson0adfd932008-06-26 21:24:35 +0000982 return self._callmethod('is_set')
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000983 def set(self):
984 return self._callmethod('set')
985 def clear(self):
986 return self._callmethod('clear')
987 def wait(self, timeout=None):
988 return self._callmethod('wait', (timeout,))
989
990class NamespaceProxy(BaseProxy):
991 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
992 def __getattr__(self, key):
993 if key[0] == '_':
994 return object.__getattribute__(self, key)
995 callmethod = object.__getattribute__(self, '_callmethod')
996 return callmethod('__getattribute__', (key,))
997 def __setattr__(self, key, value):
998 if key[0] == '_':
999 return object.__setattr__(self, key, value)
1000 callmethod = object.__getattribute__(self, '_callmethod')
1001 return callmethod('__setattr__', (key, value))
1002 def __delattr__(self, key):
1003 if key[0] == '_':
1004 return object.__delattr__(self, key)
1005 callmethod = object.__getattribute__(self, '_callmethod')
1006 return callmethod('__delattr__', (key,))
1007
1008
1009class ValueProxy(BaseProxy):
1010 _exposed_ = ('get', 'set')
1011 def get(self):
1012 return self._callmethod('get')
1013 def set(self, value):
1014 return self._callmethod('set', (value,))
1015 value = property(get, set)
1016
1017
1018BaseListProxy = MakeProxyType('BaseListProxy', (
1019 '__add__', '__contains__', '__delitem__', '__delslice__',
1020 '__getitem__', '__getslice__', '__len__', '__mul__',
1021 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1022 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1023 'reverse', 'sort', '__imul__'
1024 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1025class ListProxy(BaseListProxy):
1026 def __iadd__(self, value):
1027 self._callmethod('extend', (value,))
1028 return self
1029 def __imul__(self, value):
1030 self._callmethod('__imul__', (value,))
1031 return self
1032
1033
1034DictProxy = MakeProxyType('DictProxy', (
1035 '__contains__', '__delitem__', '__getitem__', '__len__',
1036 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1037 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1038 ))
1039
1040
1041ArrayProxy = MakeProxyType('ArrayProxy', (
1042 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1043 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1044
1045
1046PoolProxy = MakeProxyType('PoolProxy', (
1047 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1048 'map', 'map_async', 'terminate'
1049 ))
1050PoolProxy._method_to_typeid_ = {
1051 'apply_async': 'AsyncResult',
1052 'map_async': 'AsyncResult',
1053 'imap': 'Iterator',
1054 'imap_unordered': 'Iterator'
1055 }
1056
1057#
1058# Definition of SyncManager
1059#
1060
1061class SyncManager(BaseManager):
1062 '''
1063 Subclass of `BaseManager` which supports a number of shared object types.
1064
1065 The types registered are those intended for the synchronization
1066 of threads, plus `dict`, `list` and `Namespace`.
1067
1068 The `multiprocessing.Manager()` function creates started instances of
1069 this class.
1070 '''
1071
1072SyncManager.register('Queue', Queue.Queue)
1073SyncManager.register('JoinableQueue', Queue.Queue)
1074SyncManager.register('Event', threading.Event, EventProxy)
1075SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1076SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1077SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1078SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1079 AcquirerProxy)
1080SyncManager.register('Condition', threading.Condition, ConditionProxy)
1081SyncManager.register('Pool', Pool, PoolProxy)
1082SyncManager.register('list', list, ListProxy)
1083SyncManager.register('dict', dict, DictProxy)
1084SyncManager.register('Value', Value, ValueProxy)
1085SyncManager.register('Array', Array, ArrayProxy)
1086SyncManager.register('Namespace', Namespace, NamespaceProxy)
1087
1088# types returned by methods of PoolProxy
1089SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1090SyncManager.register('AsyncResult', create_method=False)