blob: d1522c2b8e02148a919d92abfa3bf8ae50e2ae2d [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8#
9
10__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11
12#
13# Imports
14#
15
16import os
17import sys
18import weakref
19import threading
20import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import queue
22
23from traceback import format_exc
24from multiprocessing import Process, current_process, active_children, Pool, util, connection
25from multiprocessing.process import AuthenticationString
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000026from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000027from multiprocessing.util import Finalize, info
28
29try:
30 from cPickle import PicklingError
31except ImportError:
32 from pickle import PicklingError
33
34#
Benjamin Petersone711caf2008-06-11 16:44:04 +000035# Register some things for pickling
36#
37
38def reduce_array(a):
39 return array.array, (a.typecode, a.tostring())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000040ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
42view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000043if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000044 def rebuild_as_list(obj):
45 return list, (list(obj),)
46 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000047 ForkingPickler.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
49#
50# Type for identifying shared objects
51#
52
53class Token(object):
54 '''
55 Type to uniquely indentify a shared object
56 '''
57 __slots__ = ('typeid', 'address', 'id')
58
59 def __init__(self, typeid, address, id):
60 (self.typeid, self.address, self.id) = (typeid, address, id)
61
62 def __getstate__(self):
63 return (self.typeid, self.address, self.id)
64
65 def __setstate__(self, state):
66 (self.typeid, self.address, self.id) = state
67
68 def __repr__(self):
69 return 'Token(typeid=%r, address=%r, id=%r)' % \
70 (self.typeid, self.address, self.id)
71
72#
73# Function for communication with a manager's server process
74#
75
76def dispatch(c, id, methodname, args=(), kwds={}):
77 '''
78 Send a message to manager using connection `c` and return response
79 '''
80 c.send((id, methodname, args, kwds))
81 kind, result = c.recv()
82 if kind == '#RETURN':
83 return result
84 raise convert_to_error(kind, result)
85
86def convert_to_error(kind, result):
87 if kind == '#ERROR':
88 return result
89 elif kind == '#TRACEBACK':
90 assert type(result) is str
91 return RemoteError(result)
92 elif kind == '#UNSERIALIZABLE':
93 assert type(result) is str
94 return RemoteError('Unserializable message: %s\n' % result)
95 else:
96 return ValueError('Unrecognized message type')
97
98class RemoteError(Exception):
99 def __str__(self):
100 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
101
102#
103# Functions for finding the method names of an object
104#
105
106def all_methods(obj):
107 '''
108 Return a list of names of methods of `obj`
109 '''
110 temp = []
111 for name in dir(obj):
112 func = getattr(obj, name)
113 if hasattr(func, '__call__'):
114 temp.append(name)
115 return temp
116
117def public_methods(obj):
118 '''
119 Return a list of names of methods of `obj` which do not start with '_'
120 '''
121 return [name for name in all_methods(obj) if name[0] != '_']
122
123#
124# Server which is run in a process controlled by a manager
125#
126
127class Server(object):
128 '''
129 Server class which runs in a process controlled by a manager object
130 '''
131 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
132 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
133
134 def __init__(self, registry, address, authkey, serializer):
135 assert isinstance(authkey, bytes)
136 self.registry = registry
137 self.authkey = AuthenticationString(authkey)
138 Listener, Client = listener_client[serializer]
139
140 # do authentication later
141 self.listener = Listener(address=address, backlog=5)
142 self.address = self.listener.address
143
144 self.id_to_obj = {0: (None, ())}
145 self.id_to_refcount = {}
146 self.mutex = threading.RLock()
147 self.stop = 0
148
149 def serve_forever(self):
150 '''
151 Run the server forever
152 '''
153 current_process()._manager_server = self
154 try:
155 try:
156 while 1:
157 try:
158 c = self.listener.accept()
159 except (OSError, IOError):
160 continue
161 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000162 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 t.start()
164 except (KeyboardInterrupt, SystemExit):
165 pass
166 finally:
167 self.stop = 999
168 self.listener.close()
169
170 def handle_request(self, c):
171 '''
172 Handle a new connection
173 '''
174 funcname = result = request = None
175 try:
176 connection.deliver_challenge(c, self.authkey)
177 connection.answer_challenge(c, self.authkey)
178 request = c.recv()
179 ignore, funcname, args, kwds = request
180 assert funcname in self.public, '%r unrecognized' % funcname
181 func = getattr(self, funcname)
182 except Exception:
183 msg = ('#TRACEBACK', format_exc())
184 else:
185 try:
186 result = func(c, *args, **kwds)
187 except Exception:
188 msg = ('#TRACEBACK', format_exc())
189 else:
190 msg = ('#RETURN', result)
191 try:
192 c.send(msg)
193 except Exception as e:
194 try:
195 c.send(('#TRACEBACK', format_exc()))
196 except Exception:
197 pass
198 util.info('Failure to send message: %r', msg)
199 util.info(' ... request was %r', request)
200 util.info(' ... exception was %r', e)
201
202 c.close()
203
204 def serve_client(self, conn):
205 '''
206 Handle requests from the proxies in a particular process/thread
207 '''
208 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000209 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000210
211 recv = conn.recv
212 send = conn.send
213 id_to_obj = self.id_to_obj
214
215 while not self.stop:
216
217 try:
218 methodname = obj = None
219 request = recv()
220 ident, methodname, args, kwds = request
221 obj, exposed, gettypeid = id_to_obj[ident]
222
223 if methodname not in exposed:
224 raise AttributeError(
225 'method %r of %r object is not in exposed=%r' %
226 (methodname, type(obj), exposed)
227 )
228
229 function = getattr(obj, methodname)
230
231 try:
232 res = function(*args, **kwds)
233 except Exception as e:
234 msg = ('#ERROR', e)
235 else:
236 typeid = gettypeid and gettypeid.get(methodname, None)
237 if typeid:
238 rident, rexposed = self.create(conn, typeid, res)
239 token = Token(typeid, self.address, rident)
240 msg = ('#PROXY', (rexposed, token))
241 else:
242 msg = ('#RETURN', res)
243
244 except AttributeError:
245 if methodname is None:
246 msg = ('#TRACEBACK', format_exc())
247 else:
248 try:
249 fallback_func = self.fallback_mapping[methodname]
250 result = fallback_func(
251 self, conn, ident, obj, *args, **kwds
252 )
253 msg = ('#RETURN', result)
254 except Exception:
255 msg = ('#TRACEBACK', format_exc())
256
257 except EOFError:
258 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000259 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000260 sys.exit(0)
261
262 except Exception:
263 msg = ('#TRACEBACK', format_exc())
264
265 try:
266 try:
267 send(msg)
268 except Exception as e:
269 send(('#UNSERIALIZABLE', repr(msg)))
270 except Exception as e:
271 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000272 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 util.info(' ... message was %r', msg)
274 util.info(' ... exception was %r', e)
275 conn.close()
276 sys.exit(1)
277
278 def fallback_getvalue(self, conn, ident, obj):
279 return obj
280
281 def fallback_str(self, conn, ident, obj):
282 return str(obj)
283
284 def fallback_repr(self, conn, ident, obj):
285 return repr(obj)
286
287 fallback_mapping = {
288 '__str__':fallback_str,
289 '__repr__':fallback_repr,
290 '#GETVALUE':fallback_getvalue
291 }
292
293 def dummy(self, c):
294 pass
295
296 def debug_info(self, c):
297 '''
298 Return some info --- useful to spot problems with refcounting
299 '''
300 self.mutex.acquire()
301 try:
302 result = []
303 keys = list(self.id_to_obj.keys())
304 keys.sort()
305 for ident in keys:
306 if ident != 0:
307 result.append(' %s: refcount=%s\n %s' %
308 (ident, self.id_to_refcount[ident],
309 str(self.id_to_obj[ident][0])[:75]))
310 return '\n'.join(result)
311 finally:
312 self.mutex.release()
313
314 def number_of_objects(self, c):
315 '''
316 Number of shared objects
317 '''
318 return len(self.id_to_obj) - 1 # don't count ident=0
319
320 def shutdown(self, c):
321 '''
322 Shutdown this process
323 '''
324 try:
325 try:
326 util.debug('manager received shutdown message')
327 c.send(('#RETURN', None))
328
329 if sys.stdout != sys.__stdout__:
330 util.debug('resetting stdout, stderr')
331 sys.stdout = sys.__stdout__
332 sys.stderr = sys.__stderr__
333
334 util._run_finalizers(0)
335
336 for p in active_children():
337 util.debug('terminating a child process of manager')
338 p.terminate()
339
340 for p in active_children():
341 util.debug('terminating a child process of manager')
342 p.join()
343
344 util._run_finalizers()
345 util.info('manager exiting with exitcode 0')
346 except:
347 import traceback
348 traceback.print_exc()
349 finally:
350 exit(0)
351
352 def create(self, c, typeid, *args, **kwds):
353 '''
354 Create a new shared object and return its id
355 '''
356 self.mutex.acquire()
357 try:
358 callable, exposed, method_to_typeid, proxytype = \
359 self.registry[typeid]
360
361 if callable is None:
362 assert len(args) == 1 and not kwds
363 obj = args[0]
364 else:
365 obj = callable(*args, **kwds)
366
367 if exposed is None:
368 exposed = public_methods(obj)
369 if method_to_typeid is not None:
370 assert type(method_to_typeid) is dict
371 exposed = list(exposed) + list(method_to_typeid)
372
373 ident = '%x' % id(obj) # convert to string because xmlrpclib
374 # only has 32 bit signed integers
375 util.debug('%r callable returned object with id %r', typeid, ident)
376
377 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
378 if ident not in self.id_to_refcount:
379 self.id_to_refcount[ident] = None
380 return ident, tuple(exposed)
381 finally:
382 self.mutex.release()
383
384 def get_methods(self, c, token):
385 '''
386 Return the methods of the shared object indicated by token
387 '''
388 return tuple(self.id_to_obj[token.id][1])
389
390 def accept_connection(self, c, name):
391 '''
392 Spawn a new thread to serve this connection
393 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000394 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000395 c.send(('#RETURN', None))
396 self.serve_client(c)
397
398 def incref(self, c, ident):
399 self.mutex.acquire()
400 try:
401 try:
402 self.id_to_refcount[ident] += 1
403 except TypeError:
404 assert self.id_to_refcount[ident] is None
405 self.id_to_refcount[ident] = 1
406 finally:
407 self.mutex.release()
408
409 def decref(self, c, ident):
410 self.mutex.acquire()
411 try:
412 assert self.id_to_refcount[ident] >= 1
413 self.id_to_refcount[ident] -= 1
414 if self.id_to_refcount[ident] == 0:
415 del self.id_to_obj[ident], self.id_to_refcount[ident]
416 util.debug('disposing of obj with id %d', ident)
417 finally:
418 self.mutex.release()
419
420#
421# Class to represent state of a manager
422#
423
424class State(object):
425 __slots__ = ['value']
426 INITIAL = 0
427 STARTED = 1
428 SHUTDOWN = 2
429
430#
431# Mapping from serializer name to Listener and Client types
432#
433
434listener_client = {
435 'pickle' : (connection.Listener, connection.Client),
436 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
437 }
438
439#
440# Definition of BaseManager
441#
442
443class BaseManager(object):
444 '''
445 Base class for managers
446 '''
447 _registry = {}
448 _Server = Server
449
450 def __init__(self, address=None, authkey=None, serializer='pickle'):
451 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000452 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 self._address = address # XXX not final address if eg ('', 0)
454 self._authkey = AuthenticationString(authkey)
455 self._state = State()
456 self._state.value = State.INITIAL
457 self._serializer = serializer
458 self._Listener, self._Client = listener_client[serializer]
459
460 def __reduce__(self):
461 return type(self).from_address, \
462 (self._address, self._authkey, self._serializer)
463
464 def get_server(self):
465 '''
466 Return server object with serve_forever() method and address attribute
467 '''
468 assert self._state.value == State.INITIAL
469 return Server(self._registry, self._address,
470 self._authkey, self._serializer)
471
472 def connect(self):
473 '''
474 Connect manager object to the server process
475 '''
476 Listener, Client = listener_client[self._serializer]
477 conn = Client(self._address, authkey=self._authkey)
478 dispatch(conn, None, 'dummy')
479 self._state.value = State.STARTED
480
481 def start(self):
482 '''
483 Spawn a server process for this manager object
484 '''
485 assert self._state.value == State.INITIAL
486
487 # pipe over which we will retrieve address of server
488 reader, writer = connection.Pipe(duplex=False)
489
490 # spawn process which runs a server
491 self._process = Process(
492 target=type(self)._run_server,
493 args=(self._registry, self._address, self._authkey,
494 self._serializer, writer),
495 )
496 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000497 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000498 self._process.start()
499
500 # get address of server
501 writer.close()
502 self._address = reader.recv()
503 reader.close()
504
505 # register a finalizer
506 self._state.value = State.STARTED
507 self.shutdown = util.Finalize(
508 self, type(self)._finalize_manager,
509 args=(self._process, self._address, self._authkey,
510 self._state, self._Client),
511 exitpriority=0
512 )
513
514 @classmethod
515 def _run_server(cls, registry, address, authkey, serializer, writer):
516 '''
517 Create a server, report its address and run it
518 '''
519 # create server
520 server = cls._Server(registry, address, authkey, serializer)
521
522 # inform parent process of the server's address
523 writer.send(server.address)
524 writer.close()
525
526 # run the manager
527 util.info('manager serving at %r', server.address)
528 server.serve_forever()
529
530 def _create(self, typeid, *args, **kwds):
531 '''
532 Create a new shared object; return the token and exposed tuple
533 '''
534 assert self._state.value == State.STARTED, 'server not yet started'
535 conn = self._Client(self._address, authkey=self._authkey)
536 try:
537 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
538 finally:
539 conn.close()
540 return Token(typeid, self._address, id), exposed
541
542 def join(self, timeout=None):
543 '''
544 Join the manager process (if it has been spawned)
545 '''
546 self._process.join(timeout)
547
548 def _debug_info(self):
549 '''
550 Return some info about the servers shared objects and connections
551 '''
552 conn = self._Client(self._address, authkey=self._authkey)
553 try:
554 return dispatch(conn, None, 'debug_info')
555 finally:
556 conn.close()
557
558 def _number_of_objects(self):
559 '''
560 Return the number of shared objects
561 '''
562 conn = self._Client(self._address, authkey=self._authkey)
563 try:
564 return dispatch(conn, None, 'number_of_objects')
565 finally:
566 conn.close()
567
568 def __enter__(self):
569 return self
570
571 def __exit__(self, exc_type, exc_val, exc_tb):
572 self.shutdown()
573
574 @staticmethod
575 def _finalize_manager(process, address, authkey, state, _Client):
576 '''
577 Shutdown the manager process; will be registered as a finalizer
578 '''
579 if process.is_alive():
580 util.info('sending shutdown message to manager')
581 try:
582 conn = _Client(address, authkey=authkey)
583 try:
584 dispatch(conn, None, 'shutdown')
585 finally:
586 conn.close()
587 except Exception:
588 pass
589
590 process.join(timeout=0.2)
591 if process.is_alive():
592 util.info('manager still alive')
593 if hasattr(process, 'terminate'):
594 util.info('trying to `terminate()` manager process')
595 process.terminate()
596 process.join(timeout=0.1)
597 if process.is_alive():
598 util.info('manager still alive after terminate')
599
600 state.value = State.SHUTDOWN
601 try:
602 del BaseProxy._address_to_local[address]
603 except KeyError:
604 pass
605
606 address = property(lambda self: self._address)
607
608 @classmethod
609 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
610 method_to_typeid=None, create_method=True):
611 '''
612 Register a typeid with the manager type
613 '''
614 if '_registry' not in cls.__dict__:
615 cls._registry = cls._registry.copy()
616
617 if proxytype is None:
618 proxytype = AutoProxy
619
620 exposed = exposed or getattr(proxytype, '_exposed_', None)
621
622 method_to_typeid = method_to_typeid or \
623 getattr(proxytype, '_method_to_typeid_', None)
624
625 if method_to_typeid:
626 for key, value in list(method_to_typeid.items()):
627 assert type(key) is str, '%r is not a string' % key
628 assert type(value) is str, '%r is not a string' % value
629
630 cls._registry[typeid] = (
631 callable, exposed, method_to_typeid, proxytype
632 )
633
634 if create_method:
635 def temp(self, *args, **kwds):
636 util.debug('requesting creation of a shared %r object', typeid)
637 token, exp = self._create(typeid, *args, **kwds)
638 proxy = proxytype(
639 token, self._serializer, manager=self,
640 authkey=self._authkey, exposed=exp
641 )
642 return proxy
643 temp.__name__ = typeid
644 setattr(cls, typeid, temp)
645
646#
647# Subclass of set which get cleared after a fork
648#
649
650class ProcessLocalSet(set):
651 def __init__(self):
652 util.register_after_fork(self, lambda obj: obj.clear())
653 def __reduce__(self):
654 return type(self), ()
655
656#
657# Definition of BaseProxy
658#
659
660class BaseProxy(object):
661 '''
662 A base for proxies of shared objects
663 '''
664 _address_to_local = {}
665 _mutex = util.ForkAwareThreadLock()
666
667 def __init__(self, token, serializer, manager=None,
668 authkey=None, exposed=None, incref=True):
669 BaseProxy._mutex.acquire()
670 try:
671 tls_idset = BaseProxy._address_to_local.get(token.address, None)
672 if tls_idset is None:
673 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
674 BaseProxy._address_to_local[token.address] = tls_idset
675 finally:
676 BaseProxy._mutex.release()
677
678 # self._tls is used to record the connection used by this
679 # thread to communicate with the manager at token.address
680 self._tls = tls_idset[0]
681
682 # self._idset is used to record the identities of all shared
683 # objects for which the current process owns references and
684 # which are in the manager at token.address
685 self._idset = tls_idset[1]
686
687 self._token = token
688 self._id = self._token.id
689 self._manager = manager
690 self._serializer = serializer
691 self._Client = listener_client[serializer][1]
692
693 if authkey is not None:
694 self._authkey = AuthenticationString(authkey)
695 elif self._manager is not None:
696 self._authkey = self._manager._authkey
697 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000698 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000699
700 if incref:
701 self._incref()
702
703 util.register_after_fork(self, BaseProxy._after_fork)
704
705 def _connect(self):
706 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000707 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000708 if threading.current_thread().name != 'MainThread':
709 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000710 conn = self._Client(self._token.address, authkey=self._authkey)
711 dispatch(conn, None, 'accept_connection', (name,))
712 self._tls.connection = conn
713
714 def _callmethod(self, methodname, args=(), kwds={}):
715 '''
716 Try to call a method of the referrent and return a copy of the result
717 '''
718 try:
719 conn = self._tls.connection
720 except AttributeError:
721 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000722 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000723 self._connect()
724 conn = self._tls.connection
725
726 conn.send((self._id, methodname, args, kwds))
727 kind, result = conn.recv()
728
729 if kind == '#RETURN':
730 return result
731 elif kind == '#PROXY':
732 exposed, token = result
733 proxytype = self._manager._registry[token.typeid][-1]
734 return proxytype(
735 token, self._serializer, manager=self._manager,
736 authkey=self._authkey, exposed=exposed
737 )
738 raise convert_to_error(kind, result)
739
740 def _getvalue(self):
741 '''
742 Get a copy of the value of the referent
743 '''
744 return self._callmethod('#GETVALUE')
745
746 def _incref(self):
747 conn = self._Client(self._token.address, authkey=self._authkey)
748 dispatch(conn, None, 'incref', (self._id,))
749 util.debug('INCREF %r', self._token.id)
750
751 self._idset.add(self._id)
752
753 state = self._manager and self._manager._state
754
755 self._close = util.Finalize(
756 self, BaseProxy._decref,
757 args=(self._token, self._authkey, state,
758 self._tls, self._idset, self._Client),
759 exitpriority=10
760 )
761
762 @staticmethod
763 def _decref(token, authkey, state, tls, idset, _Client):
764 idset.discard(token.id)
765
766 # check whether manager is still alive
767 if state is None or state.value == State.STARTED:
768 # tell manager this process no longer cares about referent
769 try:
770 util.debug('DECREF %r', token.id)
771 conn = _Client(token.address, authkey=authkey)
772 dispatch(conn, None, 'decref', (token.id,))
773 except Exception as e:
774 util.debug('... decref failed %s', e)
775
776 else:
777 util.debug('DECREF %r -- manager already shutdown', token.id)
778
779 # check whether we can close this thread's connection because
780 # the process owns no more references to objects for this manager
781 if not idset and hasattr(tls, 'connection'):
782 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000783 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000784 tls.connection.close()
785 del tls.connection
786
787 def _after_fork(self):
788 self._manager = None
789 try:
790 self._incref()
791 except Exception as e:
792 # the proxy may just be for a manager which has shutdown
793 util.info('incref failed: %s' % e)
794
795 def __reduce__(self):
796 kwds = {}
797 if Popen.thread_is_spawning():
798 kwds['authkey'] = self._authkey
799
800 if getattr(self, '_isauto', False):
801 kwds['exposed'] = self._exposed_
802 return (RebuildProxy,
803 (AutoProxy, self._token, self._serializer, kwds))
804 else:
805 return (RebuildProxy,
806 (type(self), self._token, self._serializer, kwds))
807
808 def __deepcopy__(self, memo):
809 return self._getvalue()
810
811 def __repr__(self):
812 return '<%s object, typeid %r at %s>' % \
813 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
814
815 def __str__(self):
816 '''
817 Return representation of the referent (or a fall-back if that fails)
818 '''
819 try:
820 return self._callmethod('__repr__')
821 except Exception:
822 return repr(self)[:-1] + "; '__str__()' failed>"
823
824#
825# Function used for unpickling
826#
827
828def RebuildProxy(func, token, serializer, kwds):
829 '''
830 Function used for unpickling proxy objects.
831
832 If possible the shared object is returned, or otherwise a proxy for it.
833 '''
834 server = getattr(current_process(), '_manager_server', None)
835
836 if server and server.address == token.address:
837 return server.id_to_obj[token.id][0]
838 else:
839 incref = (
840 kwds.pop('incref', True) and
841 not getattr(current_process(), '_inheriting', False)
842 )
843 return func(token, serializer, incref=incref, **kwds)
844
845#
846# Functions to create proxies and proxy types
847#
848
849def MakeProxyType(name, exposed, _cache={}):
850 '''
851 Return an proxy type whose methods are given by `exposed`
852 '''
853 exposed = tuple(exposed)
854 try:
855 return _cache[(name, exposed)]
856 except KeyError:
857 pass
858
859 dic = {}
860
861 for meth in exposed:
862 exec('''def %s(self, *args, **kwds):
863 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
864
865 ProxyType = type(name, (BaseProxy,), dic)
866 ProxyType._exposed_ = exposed
867 _cache[(name, exposed)] = ProxyType
868 return ProxyType
869
870
871def AutoProxy(token, serializer, manager=None, authkey=None,
872 exposed=None, incref=True):
873 '''
874 Return an auto-proxy for `token`
875 '''
876 _Client = listener_client[serializer][1]
877
878 if exposed is None:
879 conn = _Client(token.address, authkey=authkey)
880 try:
881 exposed = dispatch(conn, None, 'get_methods', (token,))
882 finally:
883 conn.close()
884
885 if authkey is None and manager is not None:
886 authkey = manager._authkey
887 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000888 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000889
890 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
891 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
892 incref=incref)
893 proxy._isauto = True
894 return proxy
895
896#
897# Types/callables which we will register with SyncManager
898#
899
900class Namespace(object):
901 def __init__(self, **kwds):
902 self.__dict__.update(kwds)
903 def __repr__(self):
904 items = list(self.__dict__.items())
905 temp = []
906 for name, value in items:
907 if not name.startswith('_'):
908 temp.append('%s=%r' % (name, value))
909 temp.sort()
910 return 'Namespace(%s)' % str.join(', ', temp)
911
912class Value(object):
913 def __init__(self, typecode, value, lock=True):
914 self._typecode = typecode
915 self._value = value
916 def get(self):
917 return self._value
918 def set(self, value):
919 self._value = value
920 def __repr__(self):
921 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
922 value = property(get, set)
923
924def Array(typecode, sequence, lock=True):
925 return array.array(typecode, sequence)
926
927#
928# Proxy types used by SyncManager
929#
930
931class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000932 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000933 def __iter__(self):
934 return self
935 def __next__(self, *args):
936 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000937 def send(self, *args):
938 return self._callmethod('send', args)
939 def throw(self, *args):
940 return self._callmethod('throw', args)
941 def close(self, *args):
942 return self._callmethod('close', args)
943
944
945class AcquirerProxy(BaseProxy):
946 _exposed_ = ('acquire', 'release')
947 def acquire(self, blocking=True):
948 return self._callmethod('acquire', (blocking,))
949 def release(self):
950 return self._callmethod('release')
951 def __enter__(self):
952 return self._callmethod('acquire')
953 def __exit__(self, exc_type, exc_val, exc_tb):
954 return self._callmethod('release')
955
956
957class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000958 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000959 def wait(self, timeout=None):
960 return self._callmethod('wait', (timeout,))
961 def notify(self):
962 return self._callmethod('notify')
963 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000964 return self._callmethod('notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000965
966class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000967 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000968 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000969 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970 def set(self):
971 return self._callmethod('set')
972 def clear(self):
973 return self._callmethod('clear')
974 def wait(self, timeout=None):
975 return self._callmethod('wait', (timeout,))
976
977class NamespaceProxy(BaseProxy):
978 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
979 def __getattr__(self, key):
980 if key[0] == '_':
981 return object.__getattribute__(self, key)
982 callmethod = object.__getattribute__(self, '_callmethod')
983 return callmethod('__getattribute__', (key,))
984 def __setattr__(self, key, value):
985 if key[0] == '_':
986 return object.__setattr__(self, key, value)
987 callmethod = object.__getattribute__(self, '_callmethod')
988 return callmethod('__setattr__', (key, value))
989 def __delattr__(self, key):
990 if key[0] == '_':
991 return object.__delattr__(self, key)
992 callmethod = object.__getattribute__(self, '_callmethod')
993 return callmethod('__delattr__', (key,))
994
995
996class ValueProxy(BaseProxy):
997 _exposed_ = ('get', 'set')
998 def get(self):
999 return self._callmethod('get')
1000 def set(self, value):
1001 return self._callmethod('set', (value,))
1002 value = property(get, set)
1003
1004
1005BaseListProxy = MakeProxyType('BaseListProxy', (
1006 '__add__', '__contains__', '__delitem__', '__delslice__',
1007 '__getitem__', '__getslice__', '__len__', '__mul__',
1008 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1009 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1010 'reverse', 'sort', '__imul__'
1011 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1012class ListProxy(BaseListProxy):
1013 def __iadd__(self, value):
1014 self._callmethod('extend', (value,))
1015 return self
1016 def __imul__(self, value):
1017 self._callmethod('__imul__', (value,))
1018 return self
1019
1020
1021DictProxy = MakeProxyType('DictProxy', (
1022 '__contains__', '__delitem__', '__getitem__', '__len__',
1023 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1024 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1025 ))
1026
1027
1028ArrayProxy = MakeProxyType('ArrayProxy', (
1029 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1030 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1031
1032
1033PoolProxy = MakeProxyType('PoolProxy', (
1034 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1035 'map', 'map_async', 'terminate'
1036 ))
1037PoolProxy._method_to_typeid_ = {
1038 'apply_async': 'AsyncResult',
1039 'map_async': 'AsyncResult',
1040 'imap': 'Iterator',
1041 'imap_unordered': 'Iterator'
1042 }
1043
1044#
1045# Definition of SyncManager
1046#
1047
1048class SyncManager(BaseManager):
1049 '''
1050 Subclass of `BaseManager` which supports a number of shared object types.
1051
1052 The types registered are those intended for the synchronization
1053 of threads, plus `dict`, `list` and `Namespace`.
1054
1055 The `multiprocessing.Manager()` function creates started instances of
1056 this class.
1057 '''
1058
1059SyncManager.register('Queue', queue.Queue)
1060SyncManager.register('JoinableQueue', queue.Queue)
1061SyncManager.register('Event', threading.Event, EventProxy)
1062SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1063SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1064SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1065SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1066 AcquirerProxy)
1067SyncManager.register('Condition', threading.Condition, ConditionProxy)
1068SyncManager.register('Pool', Pool, PoolProxy)
1069SyncManager.register('list', list, ListProxy)
1070SyncManager.register('dict', dict, DictProxy)
1071SyncManager.register('Value', Value, ValueProxy)
1072SyncManager.register('Array', Array, ArrayProxy)
1073SyncManager.register('Namespace', Namespace, NamespaceProxy)
1074
1075# types returned by methods of PoolProxy
1076SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1077SyncManager.register('AsyncResult', create_method=False)