blob: f895d625fdd9d5b06e7582920642dd49914c1f4a [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8#
9
10__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11
12#
13# Imports
14#
15
16import os
17import sys
18import weakref
19import threading
20import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import queue
22
23from traceback import format_exc
24from multiprocessing import Process, current_process, active_children, Pool, util, connection
25from multiprocessing.process import AuthenticationString
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000026from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000027from multiprocessing.util import Finalize, info
28
29try:
30 from cPickle import PicklingError
31except ImportError:
32 from pickle import PicklingError
33
34#
Benjamin Petersone711caf2008-06-11 16:44:04 +000035# Register some things for pickling
36#
37
38def reduce_array(a):
39 return array.array, (a.typecode, a.tostring())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000040ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
42view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000043if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000044 def rebuild_as_list(obj):
45 return list, (list(obj),)
46 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000047 ForkingPickler.register(view_type, rebuild_as_list)
Amaury Forgeot d'Arcd757e732008-08-20 08:58:40 +000048 import copyreg
49 copyreg.pickle(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000050
51#
52# Type for identifying shared objects
53#
54
55class Token(object):
56 '''
57 Type to uniquely indentify a shared object
58 '''
59 __slots__ = ('typeid', 'address', 'id')
60
61 def __init__(self, typeid, address, id):
62 (self.typeid, self.address, self.id) = (typeid, address, id)
63
64 def __getstate__(self):
65 return (self.typeid, self.address, self.id)
66
67 def __setstate__(self, state):
68 (self.typeid, self.address, self.id) = state
69
70 def __repr__(self):
71 return 'Token(typeid=%r, address=%r, id=%r)' % \
72 (self.typeid, self.address, self.id)
73
74#
75# Function for communication with a manager's server process
76#
77
78def dispatch(c, id, methodname, args=(), kwds={}):
79 '''
80 Send a message to manager using connection `c` and return response
81 '''
82 c.send((id, methodname, args, kwds))
83 kind, result = c.recv()
84 if kind == '#RETURN':
85 return result
86 raise convert_to_error(kind, result)
87
88def convert_to_error(kind, result):
89 if kind == '#ERROR':
90 return result
91 elif kind == '#TRACEBACK':
92 assert type(result) is str
93 return RemoteError(result)
94 elif kind == '#UNSERIALIZABLE':
95 assert type(result) is str
96 return RemoteError('Unserializable message: %s\n' % result)
97 else:
98 return ValueError('Unrecognized message type')
99
100class RemoteError(Exception):
101 def __str__(self):
102 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
103
104#
105# Functions for finding the method names of an object
106#
107
108def all_methods(obj):
109 '''
110 Return a list of names of methods of `obj`
111 '''
112 temp = []
113 for name in dir(obj):
114 func = getattr(obj, name)
115 if hasattr(func, '__call__'):
116 temp.append(name)
117 return temp
118
119def public_methods(obj):
120 '''
121 Return a list of names of methods of `obj` which do not start with '_'
122 '''
123 return [name for name in all_methods(obj) if name[0] != '_']
124
125#
126# Server which is run in a process controlled by a manager
127#
128
129class Server(object):
130 '''
131 Server class which runs in a process controlled by a manager object
132 '''
133 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
134 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
135
136 def __init__(self, registry, address, authkey, serializer):
137 assert isinstance(authkey, bytes)
138 self.registry = registry
139 self.authkey = AuthenticationString(authkey)
140 Listener, Client = listener_client[serializer]
141
142 # do authentication later
143 self.listener = Listener(address=address, backlog=5)
144 self.address = self.listener.address
145
146 self.id_to_obj = {0: (None, ())}
147 self.id_to_refcount = {}
148 self.mutex = threading.RLock()
149 self.stop = 0
150
151 def serve_forever(self):
152 '''
153 Run the server forever
154 '''
155 current_process()._manager_server = self
156 try:
157 try:
158 while 1:
159 try:
160 c = self.listener.accept()
161 except (OSError, IOError):
162 continue
163 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000164 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165 t.start()
166 except (KeyboardInterrupt, SystemExit):
167 pass
168 finally:
169 self.stop = 999
170 self.listener.close()
171
172 def handle_request(self, c):
173 '''
174 Handle a new connection
175 '''
176 funcname = result = request = None
177 try:
178 connection.deliver_challenge(c, self.authkey)
179 connection.answer_challenge(c, self.authkey)
180 request = c.recv()
181 ignore, funcname, args, kwds = request
182 assert funcname in self.public, '%r unrecognized' % funcname
183 func = getattr(self, funcname)
184 except Exception:
185 msg = ('#TRACEBACK', format_exc())
186 else:
187 try:
188 result = func(c, *args, **kwds)
189 except Exception:
190 msg = ('#TRACEBACK', format_exc())
191 else:
192 msg = ('#RETURN', result)
193 try:
194 c.send(msg)
195 except Exception as e:
196 try:
197 c.send(('#TRACEBACK', format_exc()))
198 except Exception:
199 pass
200 util.info('Failure to send message: %r', msg)
201 util.info(' ... request was %r', request)
202 util.info(' ... exception was %r', e)
203
204 c.close()
205
206 def serve_client(self, conn):
207 '''
208 Handle requests from the proxies in a particular process/thread
209 '''
210 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000211 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000212
213 recv = conn.recv
214 send = conn.send
215 id_to_obj = self.id_to_obj
216
217 while not self.stop:
218
219 try:
220 methodname = obj = None
221 request = recv()
222 ident, methodname, args, kwds = request
223 obj, exposed, gettypeid = id_to_obj[ident]
224
225 if methodname not in exposed:
226 raise AttributeError(
227 'method %r of %r object is not in exposed=%r' %
228 (methodname, type(obj), exposed)
229 )
230
231 function = getattr(obj, methodname)
232
233 try:
234 res = function(*args, **kwds)
235 except Exception as e:
236 msg = ('#ERROR', e)
237 else:
238 typeid = gettypeid and gettypeid.get(methodname, None)
239 if typeid:
240 rident, rexposed = self.create(conn, typeid, res)
241 token = Token(typeid, self.address, rident)
242 msg = ('#PROXY', (rexposed, token))
243 else:
244 msg = ('#RETURN', res)
245
246 except AttributeError:
247 if methodname is None:
248 msg = ('#TRACEBACK', format_exc())
249 else:
250 try:
251 fallback_func = self.fallback_mapping[methodname]
252 result = fallback_func(
253 self, conn, ident, obj, *args, **kwds
254 )
255 msg = ('#RETURN', result)
256 except Exception:
257 msg = ('#TRACEBACK', format_exc())
258
259 except EOFError:
260 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000261 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262 sys.exit(0)
263
264 except Exception:
265 msg = ('#TRACEBACK', format_exc())
266
267 try:
268 try:
269 send(msg)
270 except Exception as e:
271 send(('#UNSERIALIZABLE', repr(msg)))
272 except Exception as e:
273 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000274 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275 util.info(' ... message was %r', msg)
276 util.info(' ... exception was %r', e)
277 conn.close()
278 sys.exit(1)
279
280 def fallback_getvalue(self, conn, ident, obj):
281 return obj
282
283 def fallback_str(self, conn, ident, obj):
284 return str(obj)
285
286 def fallback_repr(self, conn, ident, obj):
287 return repr(obj)
288
289 fallback_mapping = {
290 '__str__':fallback_str,
291 '__repr__':fallback_repr,
292 '#GETVALUE':fallback_getvalue
293 }
294
295 def dummy(self, c):
296 pass
297
298 def debug_info(self, c):
299 '''
300 Return some info --- useful to spot problems with refcounting
301 '''
302 self.mutex.acquire()
303 try:
304 result = []
305 keys = list(self.id_to_obj.keys())
306 keys.sort()
307 for ident in keys:
308 if ident != 0:
309 result.append(' %s: refcount=%s\n %s' %
310 (ident, self.id_to_refcount[ident],
311 str(self.id_to_obj[ident][0])[:75]))
312 return '\n'.join(result)
313 finally:
314 self.mutex.release()
315
316 def number_of_objects(self, c):
317 '''
318 Number of shared objects
319 '''
320 return len(self.id_to_obj) - 1 # don't count ident=0
321
322 def shutdown(self, c):
323 '''
324 Shutdown this process
325 '''
326 try:
327 try:
328 util.debug('manager received shutdown message')
329 c.send(('#RETURN', None))
330
331 if sys.stdout != sys.__stdout__:
332 util.debug('resetting stdout, stderr')
333 sys.stdout = sys.__stdout__
334 sys.stderr = sys.__stderr__
335
336 util._run_finalizers(0)
337
338 for p in active_children():
339 util.debug('terminating a child process of manager')
340 p.terminate()
341
342 for p in active_children():
343 util.debug('terminating a child process of manager')
344 p.join()
345
346 util._run_finalizers()
347 util.info('manager exiting with exitcode 0')
348 except:
349 import traceback
350 traceback.print_exc()
351 finally:
352 exit(0)
353
354 def create(self, c, typeid, *args, **kwds):
355 '''
356 Create a new shared object and return its id
357 '''
358 self.mutex.acquire()
359 try:
360 callable, exposed, method_to_typeid, proxytype = \
361 self.registry[typeid]
362
363 if callable is None:
364 assert len(args) == 1 and not kwds
365 obj = args[0]
366 else:
367 obj = callable(*args, **kwds)
368
369 if exposed is None:
370 exposed = public_methods(obj)
371 if method_to_typeid is not None:
372 assert type(method_to_typeid) is dict
373 exposed = list(exposed) + list(method_to_typeid)
374
375 ident = '%x' % id(obj) # convert to string because xmlrpclib
376 # only has 32 bit signed integers
377 util.debug('%r callable returned object with id %r', typeid, ident)
378
379 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
380 if ident not in self.id_to_refcount:
381 self.id_to_refcount[ident] = None
382 return ident, tuple(exposed)
383 finally:
384 self.mutex.release()
385
386 def get_methods(self, c, token):
387 '''
388 Return the methods of the shared object indicated by token
389 '''
390 return tuple(self.id_to_obj[token.id][1])
391
392 def accept_connection(self, c, name):
393 '''
394 Spawn a new thread to serve this connection
395 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000396 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397 c.send(('#RETURN', None))
398 self.serve_client(c)
399
400 def incref(self, c, ident):
401 self.mutex.acquire()
402 try:
403 try:
404 self.id_to_refcount[ident] += 1
405 except TypeError:
406 assert self.id_to_refcount[ident] is None
407 self.id_to_refcount[ident] = 1
408 finally:
409 self.mutex.release()
410
411 def decref(self, c, ident):
412 self.mutex.acquire()
413 try:
414 assert self.id_to_refcount[ident] >= 1
415 self.id_to_refcount[ident] -= 1
416 if self.id_to_refcount[ident] == 0:
417 del self.id_to_obj[ident], self.id_to_refcount[ident]
418 util.debug('disposing of obj with id %d', ident)
419 finally:
420 self.mutex.release()
421
422#
423# Class to represent state of a manager
424#
425
426class State(object):
427 __slots__ = ['value']
428 INITIAL = 0
429 STARTED = 1
430 SHUTDOWN = 2
431
432#
433# Mapping from serializer name to Listener and Client types
434#
435
436listener_client = {
437 'pickle' : (connection.Listener, connection.Client),
438 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
439 }
440
441#
442# Definition of BaseManager
443#
444
445class BaseManager(object):
446 '''
447 Base class for managers
448 '''
449 _registry = {}
450 _Server = Server
451
452 def __init__(self, address=None, authkey=None, serializer='pickle'):
453 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000454 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000455 self._address = address # XXX not final address if eg ('', 0)
456 self._authkey = AuthenticationString(authkey)
457 self._state = State()
458 self._state.value = State.INITIAL
459 self._serializer = serializer
460 self._Listener, self._Client = listener_client[serializer]
461
462 def __reduce__(self):
463 return type(self).from_address, \
464 (self._address, self._authkey, self._serializer)
465
466 def get_server(self):
467 '''
468 Return server object with serve_forever() method and address attribute
469 '''
470 assert self._state.value == State.INITIAL
471 return Server(self._registry, self._address,
472 self._authkey, self._serializer)
473
474 def connect(self):
475 '''
476 Connect manager object to the server process
477 '''
478 Listener, Client = listener_client[self._serializer]
479 conn = Client(self._address, authkey=self._authkey)
480 dispatch(conn, None, 'dummy')
481 self._state.value = State.STARTED
482
483 def start(self):
484 '''
485 Spawn a server process for this manager object
486 '''
487 assert self._state.value == State.INITIAL
488
489 # pipe over which we will retrieve address of server
490 reader, writer = connection.Pipe(duplex=False)
491
492 # spawn process which runs a server
493 self._process = Process(
494 target=type(self)._run_server,
495 args=(self._registry, self._address, self._authkey,
496 self._serializer, writer),
497 )
498 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000499 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000500 self._process.start()
501
502 # get address of server
503 writer.close()
504 self._address = reader.recv()
505 reader.close()
506
507 # register a finalizer
508 self._state.value = State.STARTED
509 self.shutdown = util.Finalize(
510 self, type(self)._finalize_manager,
511 args=(self._process, self._address, self._authkey,
512 self._state, self._Client),
513 exitpriority=0
514 )
515
516 @classmethod
517 def _run_server(cls, registry, address, authkey, serializer, writer):
518 '''
519 Create a server, report its address and run it
520 '''
521 # create server
522 server = cls._Server(registry, address, authkey, serializer)
523
524 # inform parent process of the server's address
525 writer.send(server.address)
526 writer.close()
527
528 # run the manager
529 util.info('manager serving at %r', server.address)
530 server.serve_forever()
531
532 def _create(self, typeid, *args, **kwds):
533 '''
534 Create a new shared object; return the token and exposed tuple
535 '''
536 assert self._state.value == State.STARTED, 'server not yet started'
537 conn = self._Client(self._address, authkey=self._authkey)
538 try:
539 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
540 finally:
541 conn.close()
542 return Token(typeid, self._address, id), exposed
543
544 def join(self, timeout=None):
545 '''
546 Join the manager process (if it has been spawned)
547 '''
548 self._process.join(timeout)
549
550 def _debug_info(self):
551 '''
552 Return some info about the servers shared objects and connections
553 '''
554 conn = self._Client(self._address, authkey=self._authkey)
555 try:
556 return dispatch(conn, None, 'debug_info')
557 finally:
558 conn.close()
559
560 def _number_of_objects(self):
561 '''
562 Return the number of shared objects
563 '''
564 conn = self._Client(self._address, authkey=self._authkey)
565 try:
566 return dispatch(conn, None, 'number_of_objects')
567 finally:
568 conn.close()
569
570 def __enter__(self):
571 return self
572
573 def __exit__(self, exc_type, exc_val, exc_tb):
574 self.shutdown()
575
576 @staticmethod
577 def _finalize_manager(process, address, authkey, state, _Client):
578 '''
579 Shutdown the manager process; will be registered as a finalizer
580 '''
581 if process.is_alive():
582 util.info('sending shutdown message to manager')
583 try:
584 conn = _Client(address, authkey=authkey)
585 try:
586 dispatch(conn, None, 'shutdown')
587 finally:
588 conn.close()
589 except Exception:
590 pass
591
592 process.join(timeout=0.2)
593 if process.is_alive():
594 util.info('manager still alive')
595 if hasattr(process, 'terminate'):
596 util.info('trying to `terminate()` manager process')
597 process.terminate()
598 process.join(timeout=0.1)
599 if process.is_alive():
600 util.info('manager still alive after terminate')
601
602 state.value = State.SHUTDOWN
603 try:
604 del BaseProxy._address_to_local[address]
605 except KeyError:
606 pass
607
608 address = property(lambda self: self._address)
609
610 @classmethod
611 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
612 method_to_typeid=None, create_method=True):
613 '''
614 Register a typeid with the manager type
615 '''
616 if '_registry' not in cls.__dict__:
617 cls._registry = cls._registry.copy()
618
619 if proxytype is None:
620 proxytype = AutoProxy
621
622 exposed = exposed or getattr(proxytype, '_exposed_', None)
623
624 method_to_typeid = method_to_typeid or \
625 getattr(proxytype, '_method_to_typeid_', None)
626
627 if method_to_typeid:
628 for key, value in list(method_to_typeid.items()):
629 assert type(key) is str, '%r is not a string' % key
630 assert type(value) is str, '%r is not a string' % value
631
632 cls._registry[typeid] = (
633 callable, exposed, method_to_typeid, proxytype
634 )
635
636 if create_method:
637 def temp(self, *args, **kwds):
638 util.debug('requesting creation of a shared %r object', typeid)
639 token, exp = self._create(typeid, *args, **kwds)
640 proxy = proxytype(
641 token, self._serializer, manager=self,
642 authkey=self._authkey, exposed=exp
643 )
644 return proxy
645 temp.__name__ = typeid
646 setattr(cls, typeid, temp)
647
648#
649# Subclass of set which get cleared after a fork
650#
651
652class ProcessLocalSet(set):
653 def __init__(self):
654 util.register_after_fork(self, lambda obj: obj.clear())
655 def __reduce__(self):
656 return type(self), ()
657
658#
659# Definition of BaseProxy
660#
661
662class BaseProxy(object):
663 '''
664 A base for proxies of shared objects
665 '''
666 _address_to_local = {}
667 _mutex = util.ForkAwareThreadLock()
668
669 def __init__(self, token, serializer, manager=None,
670 authkey=None, exposed=None, incref=True):
671 BaseProxy._mutex.acquire()
672 try:
673 tls_idset = BaseProxy._address_to_local.get(token.address, None)
674 if tls_idset is None:
675 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
676 BaseProxy._address_to_local[token.address] = tls_idset
677 finally:
678 BaseProxy._mutex.release()
679
680 # self._tls is used to record the connection used by this
681 # thread to communicate with the manager at token.address
682 self._tls = tls_idset[0]
683
684 # self._idset is used to record the identities of all shared
685 # objects for which the current process owns references and
686 # which are in the manager at token.address
687 self._idset = tls_idset[1]
688
689 self._token = token
690 self._id = self._token.id
691 self._manager = manager
692 self._serializer = serializer
693 self._Client = listener_client[serializer][1]
694
695 if authkey is not None:
696 self._authkey = AuthenticationString(authkey)
697 elif self._manager is not None:
698 self._authkey = self._manager._authkey
699 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000700 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000701
702 if incref:
703 self._incref()
704
705 util.register_after_fork(self, BaseProxy._after_fork)
706
707 def _connect(self):
708 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000709 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000710 if threading.current_thread().name != 'MainThread':
711 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000712 conn = self._Client(self._token.address, authkey=self._authkey)
713 dispatch(conn, None, 'accept_connection', (name,))
714 self._tls.connection = conn
715
716 def _callmethod(self, methodname, args=(), kwds={}):
717 '''
718 Try to call a method of the referrent and return a copy of the result
719 '''
720 try:
721 conn = self._tls.connection
722 except AttributeError:
723 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000724 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000725 self._connect()
726 conn = self._tls.connection
727
728 conn.send((self._id, methodname, args, kwds))
729 kind, result = conn.recv()
730
731 if kind == '#RETURN':
732 return result
733 elif kind == '#PROXY':
734 exposed, token = result
735 proxytype = self._manager._registry[token.typeid][-1]
736 return proxytype(
737 token, self._serializer, manager=self._manager,
738 authkey=self._authkey, exposed=exposed
739 )
740 raise convert_to_error(kind, result)
741
742 def _getvalue(self):
743 '''
744 Get a copy of the value of the referent
745 '''
746 return self._callmethod('#GETVALUE')
747
748 def _incref(self):
749 conn = self._Client(self._token.address, authkey=self._authkey)
750 dispatch(conn, None, 'incref', (self._id,))
751 util.debug('INCREF %r', self._token.id)
752
753 self._idset.add(self._id)
754
755 state = self._manager and self._manager._state
756
757 self._close = util.Finalize(
758 self, BaseProxy._decref,
759 args=(self._token, self._authkey, state,
760 self._tls, self._idset, self._Client),
761 exitpriority=10
762 )
763
764 @staticmethod
765 def _decref(token, authkey, state, tls, idset, _Client):
766 idset.discard(token.id)
767
768 # check whether manager is still alive
769 if state is None or state.value == State.STARTED:
770 # tell manager this process no longer cares about referent
771 try:
772 util.debug('DECREF %r', token.id)
773 conn = _Client(token.address, authkey=authkey)
774 dispatch(conn, None, 'decref', (token.id,))
775 except Exception as e:
776 util.debug('... decref failed %s', e)
777
778 else:
779 util.debug('DECREF %r -- manager already shutdown', token.id)
780
781 # check whether we can close this thread's connection because
782 # the process owns no more references to objects for this manager
783 if not idset and hasattr(tls, 'connection'):
784 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000785 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 tls.connection.close()
787 del tls.connection
788
789 def _after_fork(self):
790 self._manager = None
791 try:
792 self._incref()
793 except Exception as e:
794 # the proxy may just be for a manager which has shutdown
795 util.info('incref failed: %s' % e)
796
797 def __reduce__(self):
798 kwds = {}
799 if Popen.thread_is_spawning():
800 kwds['authkey'] = self._authkey
801
802 if getattr(self, '_isauto', False):
803 kwds['exposed'] = self._exposed_
804 return (RebuildProxy,
805 (AutoProxy, self._token, self._serializer, kwds))
806 else:
807 return (RebuildProxy,
808 (type(self), self._token, self._serializer, kwds))
809
810 def __deepcopy__(self, memo):
811 return self._getvalue()
812
813 def __repr__(self):
814 return '<%s object, typeid %r at %s>' % \
815 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
816
817 def __str__(self):
818 '''
819 Return representation of the referent (or a fall-back if that fails)
820 '''
821 try:
822 return self._callmethod('__repr__')
823 except Exception:
824 return repr(self)[:-1] + "; '__str__()' failed>"
825
826#
827# Function used for unpickling
828#
829
830def RebuildProxy(func, token, serializer, kwds):
831 '''
832 Function used for unpickling proxy objects.
833
834 If possible the shared object is returned, or otherwise a proxy for it.
835 '''
836 server = getattr(current_process(), '_manager_server', None)
837
838 if server and server.address == token.address:
839 return server.id_to_obj[token.id][0]
840 else:
841 incref = (
842 kwds.pop('incref', True) and
843 not getattr(current_process(), '_inheriting', False)
844 )
845 return func(token, serializer, incref=incref, **kwds)
846
847#
848# Functions to create proxies and proxy types
849#
850
851def MakeProxyType(name, exposed, _cache={}):
852 '''
853 Return an proxy type whose methods are given by `exposed`
854 '''
855 exposed = tuple(exposed)
856 try:
857 return _cache[(name, exposed)]
858 except KeyError:
859 pass
860
861 dic = {}
862
863 for meth in exposed:
864 exec('''def %s(self, *args, **kwds):
865 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
866
867 ProxyType = type(name, (BaseProxy,), dic)
868 ProxyType._exposed_ = exposed
869 _cache[(name, exposed)] = ProxyType
870 return ProxyType
871
872
873def AutoProxy(token, serializer, manager=None, authkey=None,
874 exposed=None, incref=True):
875 '''
876 Return an auto-proxy for `token`
877 '''
878 _Client = listener_client[serializer][1]
879
880 if exposed is None:
881 conn = _Client(token.address, authkey=authkey)
882 try:
883 exposed = dispatch(conn, None, 'get_methods', (token,))
884 finally:
885 conn.close()
886
887 if authkey is None and manager is not None:
888 authkey = manager._authkey
889 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000890 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891
892 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
893 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
894 incref=incref)
895 proxy._isauto = True
896 return proxy
897
898#
899# Types/callables which we will register with SyncManager
900#
901
902class Namespace(object):
903 def __init__(self, **kwds):
904 self.__dict__.update(kwds)
905 def __repr__(self):
906 items = list(self.__dict__.items())
907 temp = []
908 for name, value in items:
909 if not name.startswith('_'):
910 temp.append('%s=%r' % (name, value))
911 temp.sort()
912 return 'Namespace(%s)' % str.join(', ', temp)
913
914class Value(object):
915 def __init__(self, typecode, value, lock=True):
916 self._typecode = typecode
917 self._value = value
918 def get(self):
919 return self._value
920 def set(self, value):
921 self._value = value
922 def __repr__(self):
923 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
924 value = property(get, set)
925
926def Array(typecode, sequence, lock=True):
927 return array.array(typecode, sequence)
928
929#
930# Proxy types used by SyncManager
931#
932
933class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000934 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000935 def __iter__(self):
936 return self
937 def __next__(self, *args):
938 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000939 def send(self, *args):
940 return self._callmethod('send', args)
941 def throw(self, *args):
942 return self._callmethod('throw', args)
943 def close(self, *args):
944 return self._callmethod('close', args)
945
946
947class AcquirerProxy(BaseProxy):
948 _exposed_ = ('acquire', 'release')
949 def acquire(self, blocking=True):
950 return self._callmethod('acquire', (blocking,))
951 def release(self):
952 return self._callmethod('release')
953 def __enter__(self):
954 return self._callmethod('acquire')
955 def __exit__(self, exc_type, exc_val, exc_tb):
956 return self._callmethod('release')
957
958
959class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000960 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000961 def wait(self, timeout=None):
962 return self._callmethod('wait', (timeout,))
963 def notify(self):
964 return self._callmethod('notify')
965 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000966 return self._callmethod('notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000967
968class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000969 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000971 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000972 def set(self):
973 return self._callmethod('set')
974 def clear(self):
975 return self._callmethod('clear')
976 def wait(self, timeout=None):
977 return self._callmethod('wait', (timeout,))
978
979class NamespaceProxy(BaseProxy):
980 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
981 def __getattr__(self, key):
982 if key[0] == '_':
983 return object.__getattribute__(self, key)
984 callmethod = object.__getattribute__(self, '_callmethod')
985 return callmethod('__getattribute__', (key,))
986 def __setattr__(self, key, value):
987 if key[0] == '_':
988 return object.__setattr__(self, key, value)
989 callmethod = object.__getattribute__(self, '_callmethod')
990 return callmethod('__setattr__', (key, value))
991 def __delattr__(self, key):
992 if key[0] == '_':
993 return object.__delattr__(self, key)
994 callmethod = object.__getattribute__(self, '_callmethod')
995 return callmethod('__delattr__', (key,))
996
997
998class ValueProxy(BaseProxy):
999 _exposed_ = ('get', 'set')
1000 def get(self):
1001 return self._callmethod('get')
1002 def set(self, value):
1003 return self._callmethod('set', (value,))
1004 value = property(get, set)
1005
1006
1007BaseListProxy = MakeProxyType('BaseListProxy', (
1008 '__add__', '__contains__', '__delitem__', '__delslice__',
1009 '__getitem__', '__getslice__', '__len__', '__mul__',
1010 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1011 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1012 'reverse', 'sort', '__imul__'
1013 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1014class ListProxy(BaseListProxy):
1015 def __iadd__(self, value):
1016 self._callmethod('extend', (value,))
1017 return self
1018 def __imul__(self, value):
1019 self._callmethod('__imul__', (value,))
1020 return self
1021
1022
1023DictProxy = MakeProxyType('DictProxy', (
1024 '__contains__', '__delitem__', '__getitem__', '__len__',
1025 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1026 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1027 ))
1028
1029
1030ArrayProxy = MakeProxyType('ArrayProxy', (
1031 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1032 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1033
1034
1035PoolProxy = MakeProxyType('PoolProxy', (
1036 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1037 'map', 'map_async', 'terminate'
1038 ))
1039PoolProxy._method_to_typeid_ = {
1040 'apply_async': 'AsyncResult',
1041 'map_async': 'AsyncResult',
1042 'imap': 'Iterator',
1043 'imap_unordered': 'Iterator'
1044 }
1045
1046#
1047# Definition of SyncManager
1048#
1049
1050class SyncManager(BaseManager):
1051 '''
1052 Subclass of `BaseManager` which supports a number of shared object types.
1053
1054 The types registered are those intended for the synchronization
1055 of threads, plus `dict`, `list` and `Namespace`.
1056
1057 The `multiprocessing.Manager()` function creates started instances of
1058 this class.
1059 '''
1060
1061SyncManager.register('Queue', queue.Queue)
1062SyncManager.register('JoinableQueue', queue.Queue)
1063SyncManager.register('Event', threading.Event, EventProxy)
1064SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1065SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1066SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1067SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1068 AcquirerProxy)
1069SyncManager.register('Condition', threading.Condition, ConditionProxy)
1070SyncManager.register('Pool', Pool, PoolProxy)
1071SyncManager.register('list', list, ListProxy)
1072SyncManager.register('dict', dict, DictProxy)
1073SyncManager.register('Value', Value, ValueProxy)
1074SyncManager.register('Array', Array, ArrayProxy)
1075SyncManager.register('Namespace', Namespace, NamespaceProxy)
1076
1077# types returned by methods of PoolProxy
1078SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1079SyncManager.register('AsyncResult', create_method=False)