blob: a460edc83df0896ea5c20a366b3e438d30e3f2f9 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8#
9
10__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11
12#
13# Imports
14#
15
16import os
17import sys
18import weakref
19import threading
20import array
21import copyreg
22import queue
23
24from traceback import format_exc
25from multiprocessing import Process, current_process, active_children, Pool, util, connection
26from multiprocessing.process import AuthenticationString
27from multiprocessing.forking import exit, Popen, assert_spawning
28from multiprocessing.util import Finalize, info
29
30try:
31 from cPickle import PicklingError
32except ImportError:
33 from pickle import PicklingError
34
35#
Benjamin Petersone711caf2008-06-11 16:44:04 +000036# Register some things for pickling
37#
38
39def reduce_array(a):
40 return array.array, (a.typecode, a.tostring())
41copyreg.pickle(array.array, reduce_array)
42
43view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000044if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000045 def rebuild_as_list(obj):
46 return list, (list(obj),)
47 for view_type in view_types:
48 copyreg.pickle(view_type, rebuild_as_list)
49
50#
51# Type for identifying shared objects
52#
53
54class Token(object):
55 '''
56 Type to uniquely indentify a shared object
57 '''
58 __slots__ = ('typeid', 'address', 'id')
59
60 def __init__(self, typeid, address, id):
61 (self.typeid, self.address, self.id) = (typeid, address, id)
62
63 def __getstate__(self):
64 return (self.typeid, self.address, self.id)
65
66 def __setstate__(self, state):
67 (self.typeid, self.address, self.id) = state
68
69 def __repr__(self):
70 return 'Token(typeid=%r, address=%r, id=%r)' % \
71 (self.typeid, self.address, self.id)
72
73#
74# Function for communication with a manager's server process
75#
76
77def dispatch(c, id, methodname, args=(), kwds={}):
78 '''
79 Send a message to manager using connection `c` and return response
80 '''
81 c.send((id, methodname, args, kwds))
82 kind, result = c.recv()
83 if kind == '#RETURN':
84 return result
85 raise convert_to_error(kind, result)
86
87def convert_to_error(kind, result):
88 if kind == '#ERROR':
89 return result
90 elif kind == '#TRACEBACK':
91 assert type(result) is str
92 return RemoteError(result)
93 elif kind == '#UNSERIALIZABLE':
94 assert type(result) is str
95 return RemoteError('Unserializable message: %s\n' % result)
96 else:
97 return ValueError('Unrecognized message type')
98
99class RemoteError(Exception):
100 def __str__(self):
101 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
102
103#
104# Functions for finding the method names of an object
105#
106
107def all_methods(obj):
108 '''
109 Return a list of names of methods of `obj`
110 '''
111 temp = []
112 for name in dir(obj):
113 func = getattr(obj, name)
114 if hasattr(func, '__call__'):
115 temp.append(name)
116 return temp
117
118def public_methods(obj):
119 '''
120 Return a list of names of methods of `obj` which do not start with '_'
121 '''
122 return [name for name in all_methods(obj) if name[0] != '_']
123
124#
125# Server which is run in a process controlled by a manager
126#
127
128class Server(object):
129 '''
130 Server class which runs in a process controlled by a manager object
131 '''
132 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
133 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
134
135 def __init__(self, registry, address, authkey, serializer):
136 assert isinstance(authkey, bytes)
137 self.registry = registry
138 self.authkey = AuthenticationString(authkey)
139 Listener, Client = listener_client[serializer]
140
141 # do authentication later
142 self.listener = Listener(address=address, backlog=5)
143 self.address = self.listener.address
144
145 self.id_to_obj = {0: (None, ())}
146 self.id_to_refcount = {}
147 self.mutex = threading.RLock()
148 self.stop = 0
149
150 def serve_forever(self):
151 '''
152 Run the server forever
153 '''
154 current_process()._manager_server = self
155 try:
156 try:
157 while 1:
158 try:
159 c = self.listener.accept()
160 except (OSError, IOError):
161 continue
162 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000163 t.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000164 t.start()
165 except (KeyboardInterrupt, SystemExit):
166 pass
167 finally:
168 self.stop = 999
169 self.listener.close()
170
171 def handle_request(self, c):
172 '''
173 Handle a new connection
174 '''
175 funcname = result = request = None
176 try:
177 connection.deliver_challenge(c, self.authkey)
178 connection.answer_challenge(c, self.authkey)
179 request = c.recv()
180 ignore, funcname, args, kwds = request
181 assert funcname in self.public, '%r unrecognized' % funcname
182 func = getattr(self, funcname)
183 except Exception:
184 msg = ('#TRACEBACK', format_exc())
185 else:
186 try:
187 result = func(c, *args, **kwds)
188 except Exception:
189 msg = ('#TRACEBACK', format_exc())
190 else:
191 msg = ('#RETURN', result)
192 try:
193 c.send(msg)
194 except Exception as e:
195 try:
196 c.send(('#TRACEBACK', format_exc()))
197 except Exception:
198 pass
199 util.info('Failure to send message: %r', msg)
200 util.info(' ... request was %r', request)
201 util.info(' ... exception was %r', e)
202
203 c.close()
204
205 def serve_client(self, conn):
206 '''
207 Handle requests from the proxies in a particular process/thread
208 '''
209 util.debug('starting server thread to service %r',
Benjamin Peterson672b8032008-06-11 19:14:14 +0000210 threading.current_thread().get_name())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000211
212 recv = conn.recv
213 send = conn.send
214 id_to_obj = self.id_to_obj
215
216 while not self.stop:
217
218 try:
219 methodname = obj = None
220 request = recv()
221 ident, methodname, args, kwds = request
222 obj, exposed, gettypeid = id_to_obj[ident]
223
224 if methodname not in exposed:
225 raise AttributeError(
226 'method %r of %r object is not in exposed=%r' %
227 (methodname, type(obj), exposed)
228 )
229
230 function = getattr(obj, methodname)
231
232 try:
233 res = function(*args, **kwds)
234 except Exception as e:
235 msg = ('#ERROR', e)
236 else:
237 typeid = gettypeid and gettypeid.get(methodname, None)
238 if typeid:
239 rident, rexposed = self.create(conn, typeid, res)
240 token = Token(typeid, self.address, rident)
241 msg = ('#PROXY', (rexposed, token))
242 else:
243 msg = ('#RETURN', res)
244
245 except AttributeError:
246 if methodname is None:
247 msg = ('#TRACEBACK', format_exc())
248 else:
249 try:
250 fallback_func = self.fallback_mapping[methodname]
251 result = fallback_func(
252 self, conn, ident, obj, *args, **kwds
253 )
254 msg = ('#RETURN', result)
255 except Exception:
256 msg = ('#TRACEBACK', format_exc())
257
258 except EOFError:
259 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson672b8032008-06-11 19:14:14 +0000260 threading.current_thread().get_name())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261 sys.exit(0)
262
263 except Exception:
264 msg = ('#TRACEBACK', format_exc())
265
266 try:
267 try:
268 send(msg)
269 except Exception as e:
270 send(('#UNSERIALIZABLE', repr(msg)))
271 except Exception as e:
272 util.info('exception in thread serving %r',
Benjamin Peterson672b8032008-06-11 19:14:14 +0000273 threading.current_thread().get_name())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274 util.info(' ... message was %r', msg)
275 util.info(' ... exception was %r', e)
276 conn.close()
277 sys.exit(1)
278
279 def fallback_getvalue(self, conn, ident, obj):
280 return obj
281
282 def fallback_str(self, conn, ident, obj):
283 return str(obj)
284
285 def fallback_repr(self, conn, ident, obj):
286 return repr(obj)
287
288 fallback_mapping = {
289 '__str__':fallback_str,
290 '__repr__':fallback_repr,
291 '#GETVALUE':fallback_getvalue
292 }
293
294 def dummy(self, c):
295 pass
296
297 def debug_info(self, c):
298 '''
299 Return some info --- useful to spot problems with refcounting
300 '''
301 self.mutex.acquire()
302 try:
303 result = []
304 keys = list(self.id_to_obj.keys())
305 keys.sort()
306 for ident in keys:
307 if ident != 0:
308 result.append(' %s: refcount=%s\n %s' %
309 (ident, self.id_to_refcount[ident],
310 str(self.id_to_obj[ident][0])[:75]))
311 return '\n'.join(result)
312 finally:
313 self.mutex.release()
314
315 def number_of_objects(self, c):
316 '''
317 Number of shared objects
318 '''
319 return len(self.id_to_obj) - 1 # don't count ident=0
320
321 def shutdown(self, c):
322 '''
323 Shutdown this process
324 '''
325 try:
326 try:
327 util.debug('manager received shutdown message')
328 c.send(('#RETURN', None))
329
330 if sys.stdout != sys.__stdout__:
331 util.debug('resetting stdout, stderr')
332 sys.stdout = sys.__stdout__
333 sys.stderr = sys.__stderr__
334
335 util._run_finalizers(0)
336
337 for p in active_children():
338 util.debug('terminating a child process of manager')
339 p.terminate()
340
341 for p in active_children():
342 util.debug('terminating a child process of manager')
343 p.join()
344
345 util._run_finalizers()
346 util.info('manager exiting with exitcode 0')
347 except:
348 import traceback
349 traceback.print_exc()
350 finally:
351 exit(0)
352
353 def create(self, c, typeid, *args, **kwds):
354 '''
355 Create a new shared object and return its id
356 '''
357 self.mutex.acquire()
358 try:
359 callable, exposed, method_to_typeid, proxytype = \
360 self.registry[typeid]
361
362 if callable is None:
363 assert len(args) == 1 and not kwds
364 obj = args[0]
365 else:
366 obj = callable(*args, **kwds)
367
368 if exposed is None:
369 exposed = public_methods(obj)
370 if method_to_typeid is not None:
371 assert type(method_to_typeid) is dict
372 exposed = list(exposed) + list(method_to_typeid)
373
374 ident = '%x' % id(obj) # convert to string because xmlrpclib
375 # only has 32 bit signed integers
376 util.debug('%r callable returned object with id %r', typeid, ident)
377
378 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
379 if ident not in self.id_to_refcount:
380 self.id_to_refcount[ident] = None
381 return ident, tuple(exposed)
382 finally:
383 self.mutex.release()
384
385 def get_methods(self, c, token):
386 '''
387 Return the methods of the shared object indicated by token
388 '''
389 return tuple(self.id_to_obj[token.id][1])
390
391 def accept_connection(self, c, name):
392 '''
393 Spawn a new thread to serve this connection
394 '''
Benjamin Peterson672b8032008-06-11 19:14:14 +0000395 threading.current_thread().set_name(name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000396 c.send(('#RETURN', None))
397 self.serve_client(c)
398
399 def incref(self, c, ident):
400 self.mutex.acquire()
401 try:
402 try:
403 self.id_to_refcount[ident] += 1
404 except TypeError:
405 assert self.id_to_refcount[ident] is None
406 self.id_to_refcount[ident] = 1
407 finally:
408 self.mutex.release()
409
410 def decref(self, c, ident):
411 self.mutex.acquire()
412 try:
413 assert self.id_to_refcount[ident] >= 1
414 self.id_to_refcount[ident] -= 1
415 if self.id_to_refcount[ident] == 0:
416 del self.id_to_obj[ident], self.id_to_refcount[ident]
417 util.debug('disposing of obj with id %d', ident)
418 finally:
419 self.mutex.release()
420
421#
422# Class to represent state of a manager
423#
424
425class State(object):
426 __slots__ = ['value']
427 INITIAL = 0
428 STARTED = 1
429 SHUTDOWN = 2
430
431#
432# Mapping from serializer name to Listener and Client types
433#
434
435listener_client = {
436 'pickle' : (connection.Listener, connection.Client),
437 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
438 }
439
440#
441# Definition of BaseManager
442#
443
444class BaseManager(object):
445 '''
446 Base class for managers
447 '''
448 _registry = {}
449 _Server = Server
450
451 def __init__(self, address=None, authkey=None, serializer='pickle'):
452 if authkey is None:
453 authkey = current_process().get_authkey()
454 self._address = address # XXX not final address if eg ('', 0)
455 self._authkey = AuthenticationString(authkey)
456 self._state = State()
457 self._state.value = State.INITIAL
458 self._serializer = serializer
459 self._Listener, self._Client = listener_client[serializer]
460
461 def __reduce__(self):
462 return type(self).from_address, \
463 (self._address, self._authkey, self._serializer)
464
465 def get_server(self):
466 '''
467 Return server object with serve_forever() method and address attribute
468 '''
469 assert self._state.value == State.INITIAL
470 return Server(self._registry, self._address,
471 self._authkey, self._serializer)
472
473 def connect(self):
474 '''
475 Connect manager object to the server process
476 '''
477 Listener, Client = listener_client[self._serializer]
478 conn = Client(self._address, authkey=self._authkey)
479 dispatch(conn, None, 'dummy')
480 self._state.value = State.STARTED
481
482 def start(self):
483 '''
484 Spawn a server process for this manager object
485 '''
486 assert self._state.value == State.INITIAL
487
488 # pipe over which we will retrieve address of server
489 reader, writer = connection.Pipe(duplex=False)
490
491 # spawn process which runs a server
492 self._process = Process(
493 target=type(self)._run_server,
494 args=(self._registry, self._address, self._authkey,
495 self._serializer, writer),
496 )
497 ident = ':'.join(str(i) for i in self._process._identity)
498 self._process.set_name(type(self).__name__ + '-' + ident)
499 self._process.start()
500
501 # get address of server
502 writer.close()
503 self._address = reader.recv()
504 reader.close()
505
506 # register a finalizer
507 self._state.value = State.STARTED
508 self.shutdown = util.Finalize(
509 self, type(self)._finalize_manager,
510 args=(self._process, self._address, self._authkey,
511 self._state, self._Client),
512 exitpriority=0
513 )
514
515 @classmethod
516 def _run_server(cls, registry, address, authkey, serializer, writer):
517 '''
518 Create a server, report its address and run it
519 '''
520 # create server
521 server = cls._Server(registry, address, authkey, serializer)
522
523 # inform parent process of the server's address
524 writer.send(server.address)
525 writer.close()
526
527 # run the manager
528 util.info('manager serving at %r', server.address)
529 server.serve_forever()
530
531 def _create(self, typeid, *args, **kwds):
532 '''
533 Create a new shared object; return the token and exposed tuple
534 '''
535 assert self._state.value == State.STARTED, 'server not yet started'
536 conn = self._Client(self._address, authkey=self._authkey)
537 try:
538 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
539 finally:
540 conn.close()
541 return Token(typeid, self._address, id), exposed
542
543 def join(self, timeout=None):
544 '''
545 Join the manager process (if it has been spawned)
546 '''
547 self._process.join(timeout)
548
549 def _debug_info(self):
550 '''
551 Return some info about the servers shared objects and connections
552 '''
553 conn = self._Client(self._address, authkey=self._authkey)
554 try:
555 return dispatch(conn, None, 'debug_info')
556 finally:
557 conn.close()
558
559 def _number_of_objects(self):
560 '''
561 Return the number of shared objects
562 '''
563 conn = self._Client(self._address, authkey=self._authkey)
564 try:
565 return dispatch(conn, None, 'number_of_objects')
566 finally:
567 conn.close()
568
569 def __enter__(self):
570 return self
571
572 def __exit__(self, exc_type, exc_val, exc_tb):
573 self.shutdown()
574
575 @staticmethod
576 def _finalize_manager(process, address, authkey, state, _Client):
577 '''
578 Shutdown the manager process; will be registered as a finalizer
579 '''
580 if process.is_alive():
581 util.info('sending shutdown message to manager')
582 try:
583 conn = _Client(address, authkey=authkey)
584 try:
585 dispatch(conn, None, 'shutdown')
586 finally:
587 conn.close()
588 except Exception:
589 pass
590
591 process.join(timeout=0.2)
592 if process.is_alive():
593 util.info('manager still alive')
594 if hasattr(process, 'terminate'):
595 util.info('trying to `terminate()` manager process')
596 process.terminate()
597 process.join(timeout=0.1)
598 if process.is_alive():
599 util.info('manager still alive after terminate')
600
601 state.value = State.SHUTDOWN
602 try:
603 del BaseProxy._address_to_local[address]
604 except KeyError:
605 pass
606
607 address = property(lambda self: self._address)
608
609 @classmethod
610 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
611 method_to_typeid=None, create_method=True):
612 '''
613 Register a typeid with the manager type
614 '''
615 if '_registry' not in cls.__dict__:
616 cls._registry = cls._registry.copy()
617
618 if proxytype is None:
619 proxytype = AutoProxy
620
621 exposed = exposed or getattr(proxytype, '_exposed_', None)
622
623 method_to_typeid = method_to_typeid or \
624 getattr(proxytype, '_method_to_typeid_', None)
625
626 if method_to_typeid:
627 for key, value in list(method_to_typeid.items()):
628 assert type(key) is str, '%r is not a string' % key
629 assert type(value) is str, '%r is not a string' % value
630
631 cls._registry[typeid] = (
632 callable, exposed, method_to_typeid, proxytype
633 )
634
635 if create_method:
636 def temp(self, *args, **kwds):
637 util.debug('requesting creation of a shared %r object', typeid)
638 token, exp = self._create(typeid, *args, **kwds)
639 proxy = proxytype(
640 token, self._serializer, manager=self,
641 authkey=self._authkey, exposed=exp
642 )
643 return proxy
644 temp.__name__ = typeid
645 setattr(cls, typeid, temp)
646
647#
648# Subclass of set which get cleared after a fork
649#
650
651class ProcessLocalSet(set):
652 def __init__(self):
653 util.register_after_fork(self, lambda obj: obj.clear())
654 def __reduce__(self):
655 return type(self), ()
656
657#
658# Definition of BaseProxy
659#
660
661class BaseProxy(object):
662 '''
663 A base for proxies of shared objects
664 '''
665 _address_to_local = {}
666 _mutex = util.ForkAwareThreadLock()
667
668 def __init__(self, token, serializer, manager=None,
669 authkey=None, exposed=None, incref=True):
670 BaseProxy._mutex.acquire()
671 try:
672 tls_idset = BaseProxy._address_to_local.get(token.address, None)
673 if tls_idset is None:
674 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
675 BaseProxy._address_to_local[token.address] = tls_idset
676 finally:
677 BaseProxy._mutex.release()
678
679 # self._tls is used to record the connection used by this
680 # thread to communicate with the manager at token.address
681 self._tls = tls_idset[0]
682
683 # self._idset is used to record the identities of all shared
684 # objects for which the current process owns references and
685 # which are in the manager at token.address
686 self._idset = tls_idset[1]
687
688 self._token = token
689 self._id = self._token.id
690 self._manager = manager
691 self._serializer = serializer
692 self._Client = listener_client[serializer][1]
693
694 if authkey is not None:
695 self._authkey = AuthenticationString(authkey)
696 elif self._manager is not None:
697 self._authkey = self._manager._authkey
698 else:
699 self._authkey = current_process().get_authkey()
700
701 if incref:
702 self._incref()
703
704 util.register_after_fork(self, BaseProxy._after_fork)
705
706 def _connect(self):
707 util.debug('making connection to manager')
708 name = current_process().get_name()
Benjamin Peterson672b8032008-06-11 19:14:14 +0000709 if threading.current_thread().get_name() != 'MainThread':
710 name += '|' + threading.current_thread().get_name()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000711 conn = self._Client(self._token.address, authkey=self._authkey)
712 dispatch(conn, None, 'accept_connection', (name,))
713 self._tls.connection = conn
714
715 def _callmethod(self, methodname, args=(), kwds={}):
716 '''
717 Try to call a method of the referrent and return a copy of the result
718 '''
719 try:
720 conn = self._tls.connection
721 except AttributeError:
722 util.debug('thread %r does not own a connection',
Benjamin Peterson672b8032008-06-11 19:14:14 +0000723 threading.current_thread().get_name())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000724 self._connect()
725 conn = self._tls.connection
726
727 conn.send((self._id, methodname, args, kwds))
728 kind, result = conn.recv()
729
730 if kind == '#RETURN':
731 return result
732 elif kind == '#PROXY':
733 exposed, token = result
734 proxytype = self._manager._registry[token.typeid][-1]
735 return proxytype(
736 token, self._serializer, manager=self._manager,
737 authkey=self._authkey, exposed=exposed
738 )
739 raise convert_to_error(kind, result)
740
741 def _getvalue(self):
742 '''
743 Get a copy of the value of the referent
744 '''
745 return self._callmethod('#GETVALUE')
746
747 def _incref(self):
748 conn = self._Client(self._token.address, authkey=self._authkey)
749 dispatch(conn, None, 'incref', (self._id,))
750 util.debug('INCREF %r', self._token.id)
751
752 self._idset.add(self._id)
753
754 state = self._manager and self._manager._state
755
756 self._close = util.Finalize(
757 self, BaseProxy._decref,
758 args=(self._token, self._authkey, state,
759 self._tls, self._idset, self._Client),
760 exitpriority=10
761 )
762
763 @staticmethod
764 def _decref(token, authkey, state, tls, idset, _Client):
765 idset.discard(token.id)
766
767 # check whether manager is still alive
768 if state is None or state.value == State.STARTED:
769 # tell manager this process no longer cares about referent
770 try:
771 util.debug('DECREF %r', token.id)
772 conn = _Client(token.address, authkey=authkey)
773 dispatch(conn, None, 'decref', (token.id,))
774 except Exception as e:
775 util.debug('... decref failed %s', e)
776
777 else:
778 util.debug('DECREF %r -- manager already shutdown', token.id)
779
780 # check whether we can close this thread's connection because
781 # the process owns no more references to objects for this manager
782 if not idset and hasattr(tls, 'connection'):
783 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson672b8032008-06-11 19:14:14 +0000784 threading.current_thread().get_name())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 tls.connection.close()
786 del tls.connection
787
788 def _after_fork(self):
789 self._manager = None
790 try:
791 self._incref()
792 except Exception as e:
793 # the proxy may just be for a manager which has shutdown
794 util.info('incref failed: %s' % e)
795
796 def __reduce__(self):
797 kwds = {}
798 if Popen.thread_is_spawning():
799 kwds['authkey'] = self._authkey
800
801 if getattr(self, '_isauto', False):
802 kwds['exposed'] = self._exposed_
803 return (RebuildProxy,
804 (AutoProxy, self._token, self._serializer, kwds))
805 else:
806 return (RebuildProxy,
807 (type(self), self._token, self._serializer, kwds))
808
809 def __deepcopy__(self, memo):
810 return self._getvalue()
811
812 def __repr__(self):
813 return '<%s object, typeid %r at %s>' % \
814 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
815
816 def __str__(self):
817 '''
818 Return representation of the referent (or a fall-back if that fails)
819 '''
820 try:
821 return self._callmethod('__repr__')
822 except Exception:
823 return repr(self)[:-1] + "; '__str__()' failed>"
824
825#
826# Function used for unpickling
827#
828
829def RebuildProxy(func, token, serializer, kwds):
830 '''
831 Function used for unpickling proxy objects.
832
833 If possible the shared object is returned, or otherwise a proxy for it.
834 '''
835 server = getattr(current_process(), '_manager_server', None)
836
837 if server and server.address == token.address:
838 return server.id_to_obj[token.id][0]
839 else:
840 incref = (
841 kwds.pop('incref', True) and
842 not getattr(current_process(), '_inheriting', False)
843 )
844 return func(token, serializer, incref=incref, **kwds)
845
846#
847# Functions to create proxies and proxy types
848#
849
850def MakeProxyType(name, exposed, _cache={}):
851 '''
852 Return an proxy type whose methods are given by `exposed`
853 '''
854 exposed = tuple(exposed)
855 try:
856 return _cache[(name, exposed)]
857 except KeyError:
858 pass
859
860 dic = {}
861
862 for meth in exposed:
863 exec('''def %s(self, *args, **kwds):
864 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
865
866 ProxyType = type(name, (BaseProxy,), dic)
867 ProxyType._exposed_ = exposed
868 _cache[(name, exposed)] = ProxyType
869 return ProxyType
870
871
872def AutoProxy(token, serializer, manager=None, authkey=None,
873 exposed=None, incref=True):
874 '''
875 Return an auto-proxy for `token`
876 '''
877 _Client = listener_client[serializer][1]
878
879 if exposed is None:
880 conn = _Client(token.address, authkey=authkey)
881 try:
882 exposed = dispatch(conn, None, 'get_methods', (token,))
883 finally:
884 conn.close()
885
886 if authkey is None and manager is not None:
887 authkey = manager._authkey
888 if authkey is None:
889 authkey = current_process().get_authkey()
890
891 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
892 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
893 incref=incref)
894 proxy._isauto = True
895 return proxy
896
897#
898# Types/callables which we will register with SyncManager
899#
900
901class Namespace(object):
902 def __init__(self, **kwds):
903 self.__dict__.update(kwds)
904 def __repr__(self):
905 items = list(self.__dict__.items())
906 temp = []
907 for name, value in items:
908 if not name.startswith('_'):
909 temp.append('%s=%r' % (name, value))
910 temp.sort()
911 return 'Namespace(%s)' % str.join(', ', temp)
912
913class Value(object):
914 def __init__(self, typecode, value, lock=True):
915 self._typecode = typecode
916 self._value = value
917 def get(self):
918 return self._value
919 def set(self, value):
920 self._value = value
921 def __repr__(self):
922 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
923 value = property(get, set)
924
925def Array(typecode, sequence, lock=True):
926 return array.array(typecode, sequence)
927
928#
929# Proxy types used by SyncManager
930#
931
932class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000933 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000934 def __iter__(self):
935 return self
936 def __next__(self, *args):
937 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000938 def send(self, *args):
939 return self._callmethod('send', args)
940 def throw(self, *args):
941 return self._callmethod('throw', args)
942 def close(self, *args):
943 return self._callmethod('close', args)
944
945
946class AcquirerProxy(BaseProxy):
947 _exposed_ = ('acquire', 'release')
948 def acquire(self, blocking=True):
949 return self._callmethod('acquire', (blocking,))
950 def release(self):
951 return self._callmethod('release')
952 def __enter__(self):
953 return self._callmethod('acquire')
954 def __exit__(self, exc_type, exc_val, exc_tb):
955 return self._callmethod('release')
956
957
958class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000959 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000960 def wait(self, timeout=None):
961 return self._callmethod('wait', (timeout,))
962 def notify(self):
963 return self._callmethod('notify')
964 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000965 return self._callmethod('notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000966
967class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000968 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000969 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000970 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 def set(self):
972 return self._callmethod('set')
973 def clear(self):
974 return self._callmethod('clear')
975 def wait(self, timeout=None):
976 return self._callmethod('wait', (timeout,))
977
978class NamespaceProxy(BaseProxy):
979 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
980 def __getattr__(self, key):
981 if key[0] == '_':
982 return object.__getattribute__(self, key)
983 callmethod = object.__getattribute__(self, '_callmethod')
984 return callmethod('__getattribute__', (key,))
985 def __setattr__(self, key, value):
986 if key[0] == '_':
987 return object.__setattr__(self, key, value)
988 callmethod = object.__getattribute__(self, '_callmethod')
989 return callmethod('__setattr__', (key, value))
990 def __delattr__(self, key):
991 if key[0] == '_':
992 return object.__delattr__(self, key)
993 callmethod = object.__getattribute__(self, '_callmethod')
994 return callmethod('__delattr__', (key,))
995
996
997class ValueProxy(BaseProxy):
998 _exposed_ = ('get', 'set')
999 def get(self):
1000 return self._callmethod('get')
1001 def set(self, value):
1002 return self._callmethod('set', (value,))
1003 value = property(get, set)
1004
1005
1006BaseListProxy = MakeProxyType('BaseListProxy', (
1007 '__add__', '__contains__', '__delitem__', '__delslice__',
1008 '__getitem__', '__getslice__', '__len__', '__mul__',
1009 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1010 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1011 'reverse', 'sort', '__imul__'
1012 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1013class ListProxy(BaseListProxy):
1014 def __iadd__(self, value):
1015 self._callmethod('extend', (value,))
1016 return self
1017 def __imul__(self, value):
1018 self._callmethod('__imul__', (value,))
1019 return self
1020
1021
1022DictProxy = MakeProxyType('DictProxy', (
1023 '__contains__', '__delitem__', '__getitem__', '__len__',
1024 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1025 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1026 ))
1027
1028
1029ArrayProxy = MakeProxyType('ArrayProxy', (
1030 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1031 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1032
1033
1034PoolProxy = MakeProxyType('PoolProxy', (
1035 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1036 'map', 'map_async', 'terminate'
1037 ))
1038PoolProxy._method_to_typeid_ = {
1039 'apply_async': 'AsyncResult',
1040 'map_async': 'AsyncResult',
1041 'imap': 'Iterator',
1042 'imap_unordered': 'Iterator'
1043 }
1044
1045#
1046# Definition of SyncManager
1047#
1048
1049class SyncManager(BaseManager):
1050 '''
1051 Subclass of `BaseManager` which supports a number of shared object types.
1052
1053 The types registered are those intended for the synchronization
1054 of threads, plus `dict`, `list` and `Namespace`.
1055
1056 The `multiprocessing.Manager()` function creates started instances of
1057 this class.
1058 '''
1059
1060SyncManager.register('Queue', queue.Queue)
1061SyncManager.register('JoinableQueue', queue.Queue)
1062SyncManager.register('Event', threading.Event, EventProxy)
1063SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1064SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1065SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1066SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1067 AcquirerProxy)
1068SyncManager.register('Condition', threading.Condition, ConditionProxy)
1069SyncManager.register('Pool', Pool, PoolProxy)
1070SyncManager.register('list', list, ListProxy)
1071SyncManager.register('dict', dict, DictProxy)
1072SyncManager.register('Value', Value, ValueProxy)
1073SyncManager.register('Array', Array, ArrayProxy)
1074SyncManager.register('Namespace', Namespace, NamespaceProxy)
1075
1076# types returned by methods of PoolProxy
1077SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1078SyncManager.register('AsyncResult', create_method=False)