blob: cb102e50fb5a063df39d293106739516898e6230 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8#
9
10__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11
12#
13# Imports
14#
15
16import os
17import sys
18import weakref
19import threading
20import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import queue
22
23from traceback import format_exc
Jesse Nollerf70a5382009-01-18 19:44:02 +000024from pickle import PicklingError
Benjamin Petersone711caf2008-06-11 16:44:04 +000025from multiprocessing import Process, current_process, active_children, Pool, util, connection
26from multiprocessing.process import AuthenticationString
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000027from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000028from multiprocessing.util import Finalize, info
29
Benjamin Petersone711caf2008-06-11 16:44:04 +000030#
Benjamin Petersone711caf2008-06-11 16:44:04 +000031# Register some things for pickling
32#
33
34def reduce_array(a):
35 return array.array, (a.typecode, a.tostring())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000036ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000039if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000040 def rebuild_as_list(obj):
41 return list, (list(obj),)
42 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000043 ForkingPickler.register(view_type, rebuild_as_list)
Amaury Forgeot d'Arcd757e732008-08-20 08:58:40 +000044 import copyreg
45 copyreg.pickle(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
47#
48# Type for identifying shared objects
49#
50
51class Token(object):
52 '''
53 Type to uniquely indentify a shared object
54 '''
55 __slots__ = ('typeid', 'address', 'id')
56
57 def __init__(self, typeid, address, id):
58 (self.typeid, self.address, self.id) = (typeid, address, id)
59
60 def __getstate__(self):
61 return (self.typeid, self.address, self.id)
62
63 def __setstate__(self, state):
64 (self.typeid, self.address, self.id) = state
65
66 def __repr__(self):
67 return 'Token(typeid=%r, address=%r, id=%r)' % \
68 (self.typeid, self.address, self.id)
69
70#
71# Function for communication with a manager's server process
72#
73
74def dispatch(c, id, methodname, args=(), kwds={}):
75 '''
76 Send a message to manager using connection `c` and return response
77 '''
78 c.send((id, methodname, args, kwds))
79 kind, result = c.recv()
80 if kind == '#RETURN':
81 return result
82 raise convert_to_error(kind, result)
83
84def convert_to_error(kind, result):
85 if kind == '#ERROR':
86 return result
87 elif kind == '#TRACEBACK':
88 assert type(result) is str
89 return RemoteError(result)
90 elif kind == '#UNSERIALIZABLE':
91 assert type(result) is str
92 return RemoteError('Unserializable message: %s\n' % result)
93 else:
94 return ValueError('Unrecognized message type')
95
96class RemoteError(Exception):
97 def __str__(self):
98 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
99
100#
101# Functions for finding the method names of an object
102#
103
104def all_methods(obj):
105 '''
106 Return a list of names of methods of `obj`
107 '''
108 temp = []
109 for name in dir(obj):
110 func = getattr(obj, name)
111 if hasattr(func, '__call__'):
112 temp.append(name)
113 return temp
114
115def public_methods(obj):
116 '''
117 Return a list of names of methods of `obj` which do not start with '_'
118 '''
119 return [name for name in all_methods(obj) if name[0] != '_']
120
121#
122# Server which is run in a process controlled by a manager
123#
124
125class Server(object):
126 '''
127 Server class which runs in a process controlled by a manager object
128 '''
129 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
130 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
131
132 def __init__(self, registry, address, authkey, serializer):
133 assert isinstance(authkey, bytes)
134 self.registry = registry
135 self.authkey = AuthenticationString(authkey)
136 Listener, Client = listener_client[serializer]
137
138 # do authentication later
139 self.listener = Listener(address=address, backlog=5)
140 self.address = self.listener.address
141
Jesse Noller63b3a972009-01-21 02:15:48 +0000142 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143 self.id_to_refcount = {}
144 self.mutex = threading.RLock()
145 self.stop = 0
146
147 def serve_forever(self):
148 '''
149 Run the server forever
150 '''
151 current_process()._manager_server = self
152 try:
153 try:
154 while 1:
155 try:
156 c = self.listener.accept()
157 except (OSError, IOError):
158 continue
159 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000160 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161 t.start()
162 except (KeyboardInterrupt, SystemExit):
163 pass
164 finally:
165 self.stop = 999
166 self.listener.close()
167
168 def handle_request(self, c):
169 '''
170 Handle a new connection
171 '''
172 funcname = result = request = None
173 try:
174 connection.deliver_challenge(c, self.authkey)
175 connection.answer_challenge(c, self.authkey)
176 request = c.recv()
177 ignore, funcname, args, kwds = request
178 assert funcname in self.public, '%r unrecognized' % funcname
179 func = getattr(self, funcname)
180 except Exception:
181 msg = ('#TRACEBACK', format_exc())
182 else:
183 try:
184 result = func(c, *args, **kwds)
185 except Exception:
186 msg = ('#TRACEBACK', format_exc())
187 else:
188 msg = ('#RETURN', result)
189 try:
190 c.send(msg)
191 except Exception as e:
192 try:
193 c.send(('#TRACEBACK', format_exc()))
194 except Exception:
195 pass
196 util.info('Failure to send message: %r', msg)
197 util.info(' ... request was %r', request)
198 util.info(' ... exception was %r', e)
199
200 c.close()
201
202 def serve_client(self, conn):
203 '''
204 Handle requests from the proxies in a particular process/thread
205 '''
206 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000207 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208
209 recv = conn.recv
210 send = conn.send
211 id_to_obj = self.id_to_obj
212
213 while not self.stop:
214
215 try:
216 methodname = obj = None
217 request = recv()
218 ident, methodname, args, kwds = request
219 obj, exposed, gettypeid = id_to_obj[ident]
220
221 if methodname not in exposed:
222 raise AttributeError(
223 'method %r of %r object is not in exposed=%r' %
224 (methodname, type(obj), exposed)
225 )
226
227 function = getattr(obj, methodname)
228
229 try:
230 res = function(*args, **kwds)
231 except Exception as e:
232 msg = ('#ERROR', e)
233 else:
234 typeid = gettypeid and gettypeid.get(methodname, None)
235 if typeid:
236 rident, rexposed = self.create(conn, typeid, res)
237 token = Token(typeid, self.address, rident)
238 msg = ('#PROXY', (rexposed, token))
239 else:
240 msg = ('#RETURN', res)
241
242 except AttributeError:
243 if methodname is None:
244 msg = ('#TRACEBACK', format_exc())
245 else:
246 try:
247 fallback_func = self.fallback_mapping[methodname]
248 result = fallback_func(
249 self, conn, ident, obj, *args, **kwds
250 )
251 msg = ('#RETURN', result)
252 except Exception:
253 msg = ('#TRACEBACK', format_exc())
254
255 except EOFError:
256 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000257 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258 sys.exit(0)
259
260 except Exception:
261 msg = ('#TRACEBACK', format_exc())
262
263 try:
264 try:
265 send(msg)
266 except Exception as e:
267 send(('#UNSERIALIZABLE', repr(msg)))
268 except Exception as e:
269 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000270 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 util.info(' ... message was %r', msg)
272 util.info(' ... exception was %r', e)
273 conn.close()
274 sys.exit(1)
275
276 def fallback_getvalue(self, conn, ident, obj):
277 return obj
278
279 def fallback_str(self, conn, ident, obj):
280 return str(obj)
281
282 def fallback_repr(self, conn, ident, obj):
283 return repr(obj)
284
285 fallback_mapping = {
286 '__str__':fallback_str,
287 '__repr__':fallback_repr,
288 '#GETVALUE':fallback_getvalue
289 }
290
291 def dummy(self, c):
292 pass
293
294 def debug_info(self, c):
295 '''
296 Return some info --- useful to spot problems with refcounting
297 '''
298 self.mutex.acquire()
299 try:
300 result = []
301 keys = list(self.id_to_obj.keys())
302 keys.sort()
303 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000304 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000305 result.append(' %s: refcount=%s\n %s' %
306 (ident, self.id_to_refcount[ident],
307 str(self.id_to_obj[ident][0])[:75]))
308 return '\n'.join(result)
309 finally:
310 self.mutex.release()
311
312 def number_of_objects(self, c):
313 '''
314 Number of shared objects
315 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000316 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000317
318 def shutdown(self, c):
319 '''
320 Shutdown this process
321 '''
322 try:
323 try:
324 util.debug('manager received shutdown message')
325 c.send(('#RETURN', None))
326
327 if sys.stdout != sys.__stdout__:
328 util.debug('resetting stdout, stderr')
329 sys.stdout = sys.__stdout__
330 sys.stderr = sys.__stderr__
331
332 util._run_finalizers(0)
333
334 for p in active_children():
335 util.debug('terminating a child process of manager')
336 p.terminate()
337
338 for p in active_children():
339 util.debug('terminating a child process of manager')
340 p.join()
341
342 util._run_finalizers()
343 util.info('manager exiting with exitcode 0')
344 except:
345 import traceback
346 traceback.print_exc()
347 finally:
348 exit(0)
349
350 def create(self, c, typeid, *args, **kwds):
351 '''
352 Create a new shared object and return its id
353 '''
354 self.mutex.acquire()
355 try:
356 callable, exposed, method_to_typeid, proxytype = \
357 self.registry[typeid]
358
359 if callable is None:
360 assert len(args) == 1 and not kwds
361 obj = args[0]
362 else:
363 obj = callable(*args, **kwds)
364
365 if exposed is None:
366 exposed = public_methods(obj)
367 if method_to_typeid is not None:
368 assert type(method_to_typeid) is dict
369 exposed = list(exposed) + list(method_to_typeid)
370
371 ident = '%x' % id(obj) # convert to string because xmlrpclib
372 # only has 32 bit signed integers
373 util.debug('%r callable returned object with id %r', typeid, ident)
374
375 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
376 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000377 self.id_to_refcount[ident] = 0
378 # increment the reference count immediately, to avoid
379 # this object being garbage collected before a Proxy
380 # object for it can be created. The caller of create()
381 # is responsible for doing a decref once the Proxy object
382 # has been created.
383 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000384 return ident, tuple(exposed)
385 finally:
386 self.mutex.release()
387
388 def get_methods(self, c, token):
389 '''
390 Return the methods of the shared object indicated by token
391 '''
392 return tuple(self.id_to_obj[token.id][1])
393
394 def accept_connection(self, c, name):
395 '''
396 Spawn a new thread to serve this connection
397 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000398 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000399 c.send(('#RETURN', None))
400 self.serve_client(c)
401
402 def incref(self, c, ident):
403 self.mutex.acquire()
404 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000405 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000406 finally:
407 self.mutex.release()
408
409 def decref(self, c, ident):
410 self.mutex.acquire()
411 try:
412 assert self.id_to_refcount[ident] >= 1
413 self.id_to_refcount[ident] -= 1
414 if self.id_to_refcount[ident] == 0:
415 del self.id_to_obj[ident], self.id_to_refcount[ident]
416 util.debug('disposing of obj with id %d', ident)
417 finally:
418 self.mutex.release()
419
420#
421# Class to represent state of a manager
422#
423
424class State(object):
425 __slots__ = ['value']
426 INITIAL = 0
427 STARTED = 1
428 SHUTDOWN = 2
429
430#
431# Mapping from serializer name to Listener and Client types
432#
433
434listener_client = {
435 'pickle' : (connection.Listener, connection.Client),
436 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
437 }
438
439#
440# Definition of BaseManager
441#
442
443class BaseManager(object):
444 '''
445 Base class for managers
446 '''
447 _registry = {}
448 _Server = Server
449
450 def __init__(self, address=None, authkey=None, serializer='pickle'):
451 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000452 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 self._address = address # XXX not final address if eg ('', 0)
454 self._authkey = AuthenticationString(authkey)
455 self._state = State()
456 self._state.value = State.INITIAL
457 self._serializer = serializer
458 self._Listener, self._Client = listener_client[serializer]
459
460 def __reduce__(self):
461 return type(self).from_address, \
462 (self._address, self._authkey, self._serializer)
463
464 def get_server(self):
465 '''
466 Return server object with serve_forever() method and address attribute
467 '''
468 assert self._state.value == State.INITIAL
469 return Server(self._registry, self._address,
470 self._authkey, self._serializer)
471
472 def connect(self):
473 '''
474 Connect manager object to the server process
475 '''
476 Listener, Client = listener_client[self._serializer]
477 conn = Client(self._address, authkey=self._authkey)
478 dispatch(conn, None, 'dummy')
479 self._state.value = State.STARTED
480
481 def start(self):
482 '''
483 Spawn a server process for this manager object
484 '''
485 assert self._state.value == State.INITIAL
486
487 # pipe over which we will retrieve address of server
488 reader, writer = connection.Pipe(duplex=False)
489
490 # spawn process which runs a server
491 self._process = Process(
492 target=type(self)._run_server,
493 args=(self._registry, self._address, self._authkey,
494 self._serializer, writer),
495 )
496 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000497 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000498 self._process.start()
499
500 # get address of server
501 writer.close()
502 self._address = reader.recv()
503 reader.close()
504
505 # register a finalizer
506 self._state.value = State.STARTED
507 self.shutdown = util.Finalize(
508 self, type(self)._finalize_manager,
509 args=(self._process, self._address, self._authkey,
510 self._state, self._Client),
511 exitpriority=0
512 )
513
514 @classmethod
515 def _run_server(cls, registry, address, authkey, serializer, writer):
516 '''
517 Create a server, report its address and run it
518 '''
519 # create server
520 server = cls._Server(registry, address, authkey, serializer)
521
522 # inform parent process of the server's address
523 writer.send(server.address)
524 writer.close()
525
526 # run the manager
527 util.info('manager serving at %r', server.address)
528 server.serve_forever()
529
530 def _create(self, typeid, *args, **kwds):
531 '''
532 Create a new shared object; return the token and exposed tuple
533 '''
534 assert self._state.value == State.STARTED, 'server not yet started'
535 conn = self._Client(self._address, authkey=self._authkey)
536 try:
537 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
538 finally:
539 conn.close()
540 return Token(typeid, self._address, id), exposed
541
542 def join(self, timeout=None):
543 '''
544 Join the manager process (if it has been spawned)
545 '''
546 self._process.join(timeout)
547
548 def _debug_info(self):
549 '''
550 Return some info about the servers shared objects and connections
551 '''
552 conn = self._Client(self._address, authkey=self._authkey)
553 try:
554 return dispatch(conn, None, 'debug_info')
555 finally:
556 conn.close()
557
558 def _number_of_objects(self):
559 '''
560 Return the number of shared objects
561 '''
562 conn = self._Client(self._address, authkey=self._authkey)
563 try:
564 return dispatch(conn, None, 'number_of_objects')
565 finally:
566 conn.close()
567
568 def __enter__(self):
569 return self
570
571 def __exit__(self, exc_type, exc_val, exc_tb):
572 self.shutdown()
573
574 @staticmethod
575 def _finalize_manager(process, address, authkey, state, _Client):
576 '''
577 Shutdown the manager process; will be registered as a finalizer
578 '''
579 if process.is_alive():
580 util.info('sending shutdown message to manager')
581 try:
582 conn = _Client(address, authkey=authkey)
583 try:
584 dispatch(conn, None, 'shutdown')
585 finally:
586 conn.close()
587 except Exception:
588 pass
589
590 process.join(timeout=0.2)
591 if process.is_alive():
592 util.info('manager still alive')
593 if hasattr(process, 'terminate'):
594 util.info('trying to `terminate()` manager process')
595 process.terminate()
596 process.join(timeout=0.1)
597 if process.is_alive():
598 util.info('manager still alive after terminate')
599
600 state.value = State.SHUTDOWN
601 try:
602 del BaseProxy._address_to_local[address]
603 except KeyError:
604 pass
605
606 address = property(lambda self: self._address)
607
608 @classmethod
609 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
610 method_to_typeid=None, create_method=True):
611 '''
612 Register a typeid with the manager type
613 '''
614 if '_registry' not in cls.__dict__:
615 cls._registry = cls._registry.copy()
616
617 if proxytype is None:
618 proxytype = AutoProxy
619
620 exposed = exposed or getattr(proxytype, '_exposed_', None)
621
622 method_to_typeid = method_to_typeid or \
623 getattr(proxytype, '_method_to_typeid_', None)
624
625 if method_to_typeid:
626 for key, value in list(method_to_typeid.items()):
627 assert type(key) is str, '%r is not a string' % key
628 assert type(value) is str, '%r is not a string' % value
629
630 cls._registry[typeid] = (
631 callable, exposed, method_to_typeid, proxytype
632 )
633
634 if create_method:
635 def temp(self, *args, **kwds):
636 util.debug('requesting creation of a shared %r object', typeid)
637 token, exp = self._create(typeid, *args, **kwds)
638 proxy = proxytype(
639 token, self._serializer, manager=self,
640 authkey=self._authkey, exposed=exp
641 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000642 conn = self._Client(token.address, authkey=self._authkey)
643 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000644 return proxy
645 temp.__name__ = typeid
646 setattr(cls, typeid, temp)
647
648#
649# Subclass of set which get cleared after a fork
650#
651
652class ProcessLocalSet(set):
653 def __init__(self):
654 util.register_after_fork(self, lambda obj: obj.clear())
655 def __reduce__(self):
656 return type(self), ()
657
658#
659# Definition of BaseProxy
660#
661
662class BaseProxy(object):
663 '''
664 A base for proxies of shared objects
665 '''
666 _address_to_local = {}
667 _mutex = util.ForkAwareThreadLock()
668
669 def __init__(self, token, serializer, manager=None,
670 authkey=None, exposed=None, incref=True):
671 BaseProxy._mutex.acquire()
672 try:
673 tls_idset = BaseProxy._address_to_local.get(token.address, None)
674 if tls_idset is None:
675 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
676 BaseProxy._address_to_local[token.address] = tls_idset
677 finally:
678 BaseProxy._mutex.release()
679
680 # self._tls is used to record the connection used by this
681 # thread to communicate with the manager at token.address
682 self._tls = tls_idset[0]
683
684 # self._idset is used to record the identities of all shared
685 # objects for which the current process owns references and
686 # which are in the manager at token.address
687 self._idset = tls_idset[1]
688
689 self._token = token
690 self._id = self._token.id
691 self._manager = manager
692 self._serializer = serializer
693 self._Client = listener_client[serializer][1]
694
695 if authkey is not None:
696 self._authkey = AuthenticationString(authkey)
697 elif self._manager is not None:
698 self._authkey = self._manager._authkey
699 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000700 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000701
702 if incref:
703 self._incref()
704
705 util.register_after_fork(self, BaseProxy._after_fork)
706
707 def _connect(self):
708 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000709 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000710 if threading.current_thread().name != 'MainThread':
711 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000712 conn = self._Client(self._token.address, authkey=self._authkey)
713 dispatch(conn, None, 'accept_connection', (name,))
714 self._tls.connection = conn
715
716 def _callmethod(self, methodname, args=(), kwds={}):
717 '''
718 Try to call a method of the referrent and return a copy of the result
719 '''
720 try:
721 conn = self._tls.connection
722 except AttributeError:
723 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000724 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000725 self._connect()
726 conn = self._tls.connection
727
728 conn.send((self._id, methodname, args, kwds))
729 kind, result = conn.recv()
730
731 if kind == '#RETURN':
732 return result
733 elif kind == '#PROXY':
734 exposed, token = result
735 proxytype = self._manager._registry[token.typeid][-1]
Jesse Noller824f4f32008-09-02 19:12:20 +0000736 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000737 token, self._serializer, manager=self._manager,
738 authkey=self._authkey, exposed=exposed
739 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000740 conn = self._Client(token.address, authkey=self._authkey)
741 dispatch(conn, None, 'decref', (token.id,))
742 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000743 raise convert_to_error(kind, result)
744
745 def _getvalue(self):
746 '''
747 Get a copy of the value of the referent
748 '''
749 return self._callmethod('#GETVALUE')
750
751 def _incref(self):
752 conn = self._Client(self._token.address, authkey=self._authkey)
753 dispatch(conn, None, 'incref', (self._id,))
754 util.debug('INCREF %r', self._token.id)
755
756 self._idset.add(self._id)
757
758 state = self._manager and self._manager._state
759
760 self._close = util.Finalize(
761 self, BaseProxy._decref,
762 args=(self._token, self._authkey, state,
763 self._tls, self._idset, self._Client),
764 exitpriority=10
765 )
766
767 @staticmethod
768 def _decref(token, authkey, state, tls, idset, _Client):
769 idset.discard(token.id)
770
771 # check whether manager is still alive
772 if state is None or state.value == State.STARTED:
773 # tell manager this process no longer cares about referent
774 try:
775 util.debug('DECREF %r', token.id)
776 conn = _Client(token.address, authkey=authkey)
777 dispatch(conn, None, 'decref', (token.id,))
778 except Exception as e:
779 util.debug('... decref failed %s', e)
780
781 else:
782 util.debug('DECREF %r -- manager already shutdown', token.id)
783
784 # check whether we can close this thread's connection because
785 # the process owns no more references to objects for this manager
786 if not idset and hasattr(tls, 'connection'):
787 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000788 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789 tls.connection.close()
790 del tls.connection
791
792 def _after_fork(self):
793 self._manager = None
794 try:
795 self._incref()
796 except Exception as e:
797 # the proxy may just be for a manager which has shutdown
798 util.info('incref failed: %s' % e)
799
800 def __reduce__(self):
801 kwds = {}
802 if Popen.thread_is_spawning():
803 kwds['authkey'] = self._authkey
804
805 if getattr(self, '_isauto', False):
806 kwds['exposed'] = self._exposed_
807 return (RebuildProxy,
808 (AutoProxy, self._token, self._serializer, kwds))
809 else:
810 return (RebuildProxy,
811 (type(self), self._token, self._serializer, kwds))
812
813 def __deepcopy__(self, memo):
814 return self._getvalue()
815
816 def __repr__(self):
817 return '<%s object, typeid %r at %s>' % \
818 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
819
820 def __str__(self):
821 '''
822 Return representation of the referent (or a fall-back if that fails)
823 '''
824 try:
825 return self._callmethod('__repr__')
826 except Exception:
827 return repr(self)[:-1] + "; '__str__()' failed>"
828
829#
830# Function used for unpickling
831#
832
833def RebuildProxy(func, token, serializer, kwds):
834 '''
835 Function used for unpickling proxy objects.
836
837 If possible the shared object is returned, or otherwise a proxy for it.
838 '''
839 server = getattr(current_process(), '_manager_server', None)
840
841 if server and server.address == token.address:
842 return server.id_to_obj[token.id][0]
843 else:
844 incref = (
845 kwds.pop('incref', True) and
846 not getattr(current_process(), '_inheriting', False)
847 )
848 return func(token, serializer, incref=incref, **kwds)
849
850#
851# Functions to create proxies and proxy types
852#
853
854def MakeProxyType(name, exposed, _cache={}):
855 '''
856 Return an proxy type whose methods are given by `exposed`
857 '''
858 exposed = tuple(exposed)
859 try:
860 return _cache[(name, exposed)]
861 except KeyError:
862 pass
863
864 dic = {}
865
866 for meth in exposed:
867 exec('''def %s(self, *args, **kwds):
868 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
869
870 ProxyType = type(name, (BaseProxy,), dic)
871 ProxyType._exposed_ = exposed
872 _cache[(name, exposed)] = ProxyType
873 return ProxyType
874
875
876def AutoProxy(token, serializer, manager=None, authkey=None,
877 exposed=None, incref=True):
878 '''
879 Return an auto-proxy for `token`
880 '''
881 _Client = listener_client[serializer][1]
882
883 if exposed is None:
884 conn = _Client(token.address, authkey=authkey)
885 try:
886 exposed = dispatch(conn, None, 'get_methods', (token,))
887 finally:
888 conn.close()
889
890 if authkey is None and manager is not None:
891 authkey = manager._authkey
892 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000893 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000894
895 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
896 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
897 incref=incref)
898 proxy._isauto = True
899 return proxy
900
901#
902# Types/callables which we will register with SyncManager
903#
904
905class Namespace(object):
906 def __init__(self, **kwds):
907 self.__dict__.update(kwds)
908 def __repr__(self):
909 items = list(self.__dict__.items())
910 temp = []
911 for name, value in items:
912 if not name.startswith('_'):
913 temp.append('%s=%r' % (name, value))
914 temp.sort()
915 return 'Namespace(%s)' % str.join(', ', temp)
916
917class Value(object):
918 def __init__(self, typecode, value, lock=True):
919 self._typecode = typecode
920 self._value = value
921 def get(self):
922 return self._value
923 def set(self, value):
924 self._value = value
925 def __repr__(self):
926 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
927 value = property(get, set)
928
929def Array(typecode, sequence, lock=True):
930 return array.array(typecode, sequence)
931
932#
933# Proxy types used by SyncManager
934#
935
936class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000937 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000938 def __iter__(self):
939 return self
940 def __next__(self, *args):
941 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000942 def send(self, *args):
943 return self._callmethod('send', args)
944 def throw(self, *args):
945 return self._callmethod('throw', args)
946 def close(self, *args):
947 return self._callmethod('close', args)
948
949
950class AcquirerProxy(BaseProxy):
951 _exposed_ = ('acquire', 'release')
952 def acquire(self, blocking=True):
953 return self._callmethod('acquire', (blocking,))
954 def release(self):
955 return self._callmethod('release')
956 def __enter__(self):
957 return self._callmethod('acquire')
958 def __exit__(self, exc_type, exc_val, exc_tb):
959 return self._callmethod('release')
960
961
962class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000963 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964 def wait(self, timeout=None):
965 return self._callmethod('wait', (timeout,))
966 def notify(self):
967 return self._callmethod('notify')
968 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000969 return self._callmethod('notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970
971class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000972 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000973 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000974 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000975 def set(self):
976 return self._callmethod('set')
977 def clear(self):
978 return self._callmethod('clear')
979 def wait(self, timeout=None):
980 return self._callmethod('wait', (timeout,))
981
982class NamespaceProxy(BaseProxy):
983 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
984 def __getattr__(self, key):
985 if key[0] == '_':
986 return object.__getattribute__(self, key)
987 callmethod = object.__getattribute__(self, '_callmethod')
988 return callmethod('__getattribute__', (key,))
989 def __setattr__(self, key, value):
990 if key[0] == '_':
991 return object.__setattr__(self, key, value)
992 callmethod = object.__getattribute__(self, '_callmethod')
993 return callmethod('__setattr__', (key, value))
994 def __delattr__(self, key):
995 if key[0] == '_':
996 return object.__delattr__(self, key)
997 callmethod = object.__getattribute__(self, '_callmethod')
998 return callmethod('__delattr__', (key,))
999
1000
1001class ValueProxy(BaseProxy):
1002 _exposed_ = ('get', 'set')
1003 def get(self):
1004 return self._callmethod('get')
1005 def set(self, value):
1006 return self._callmethod('set', (value,))
1007 value = property(get, set)
1008
1009
1010BaseListProxy = MakeProxyType('BaseListProxy', (
1011 '__add__', '__contains__', '__delitem__', '__delslice__',
1012 '__getitem__', '__getslice__', '__len__', '__mul__',
1013 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1014 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1015 'reverse', 'sort', '__imul__'
1016 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1017class ListProxy(BaseListProxy):
1018 def __iadd__(self, value):
1019 self._callmethod('extend', (value,))
1020 return self
1021 def __imul__(self, value):
1022 self._callmethod('__imul__', (value,))
1023 return self
1024
1025
1026DictProxy = MakeProxyType('DictProxy', (
1027 '__contains__', '__delitem__', '__getitem__', '__len__',
1028 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1029 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1030 ))
1031
1032
1033ArrayProxy = MakeProxyType('ArrayProxy', (
1034 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1035 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1036
1037
1038PoolProxy = MakeProxyType('PoolProxy', (
1039 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1040 'map', 'map_async', 'terminate'
1041 ))
1042PoolProxy._method_to_typeid_ = {
1043 'apply_async': 'AsyncResult',
1044 'map_async': 'AsyncResult',
1045 'imap': 'Iterator',
1046 'imap_unordered': 'Iterator'
1047 }
1048
1049#
1050# Definition of SyncManager
1051#
1052
1053class SyncManager(BaseManager):
1054 '''
1055 Subclass of `BaseManager` which supports a number of shared object types.
1056
1057 The types registered are those intended for the synchronization
1058 of threads, plus `dict`, `list` and `Namespace`.
1059
1060 The `multiprocessing.Manager()` function creates started instances of
1061 this class.
1062 '''
1063
1064SyncManager.register('Queue', queue.Queue)
1065SyncManager.register('JoinableQueue', queue.Queue)
1066SyncManager.register('Event', threading.Event, EventProxy)
1067SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1068SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1069SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1070SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1071 AcquirerProxy)
1072SyncManager.register('Condition', threading.Condition, ConditionProxy)
1073SyncManager.register('Pool', Pool, PoolProxy)
1074SyncManager.register('list', list, ListProxy)
1075SyncManager.register('dict', dict, DictProxy)
1076SyncManager.register('Value', Value, ValueProxy)
1077SyncManager.register('Array', Array, ArrayProxy)
1078SyncManager.register('Namespace', Namespace, NamespaceProxy)
1079
1080# types returned by methods of PoolProxy
1081SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1082SyncManager.register('AsyncResult', create_method=False)