blob: 44f749b1aee893c2dc12850cb62aed63133981c5 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8#
9
10__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11
12#
13# Imports
14#
15
16import os
17import sys
18import weakref
19import threading
20import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000021import queue
22
23from traceback import format_exc
Jesse Nollerf70a5382009-01-18 19:44:02 +000024from pickle import PicklingError
Benjamin Petersone711caf2008-06-11 16:44:04 +000025from multiprocessing import Process, current_process, active_children, Pool, util, connection
26from multiprocessing.process import AuthenticationString
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000027from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000028from multiprocessing.util import Finalize, info
29
Benjamin Petersone711caf2008-06-11 16:44:04 +000030#
Benjamin Petersone711caf2008-06-11 16:44:04 +000031# Register some things for pickling
32#
33
34def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000035 return array.array, (a.typecode, a.tobytes())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000036ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000039if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000040 def rebuild_as_list(obj):
41 return list, (list(obj),)
42 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000043 ForkingPickler.register(view_type, rebuild_as_list)
Amaury Forgeot d'Arcd757e732008-08-20 08:58:40 +000044 import copyreg
45 copyreg.pickle(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
47#
48# Type for identifying shared objects
49#
50
51class Token(object):
52 '''
53 Type to uniquely indentify a shared object
54 '''
55 __slots__ = ('typeid', 'address', 'id')
56
57 def __init__(self, typeid, address, id):
58 (self.typeid, self.address, self.id) = (typeid, address, id)
59
60 def __getstate__(self):
61 return (self.typeid, self.address, self.id)
62
63 def __setstate__(self, state):
64 (self.typeid, self.address, self.id) = state
65
66 def __repr__(self):
67 return 'Token(typeid=%r, address=%r, id=%r)' % \
68 (self.typeid, self.address, self.id)
69
70#
71# Function for communication with a manager's server process
72#
73
74def dispatch(c, id, methodname, args=(), kwds={}):
75 '''
76 Send a message to manager using connection `c` and return response
77 '''
78 c.send((id, methodname, args, kwds))
79 kind, result = c.recv()
80 if kind == '#RETURN':
81 return result
82 raise convert_to_error(kind, result)
83
84def convert_to_error(kind, result):
85 if kind == '#ERROR':
86 return result
87 elif kind == '#TRACEBACK':
88 assert type(result) is str
89 return RemoteError(result)
90 elif kind == '#UNSERIALIZABLE':
91 assert type(result) is str
92 return RemoteError('Unserializable message: %s\n' % result)
93 else:
94 return ValueError('Unrecognized message type')
95
96class RemoteError(Exception):
97 def __str__(self):
98 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
99
100#
101# Functions for finding the method names of an object
102#
103
104def all_methods(obj):
105 '''
106 Return a list of names of methods of `obj`
107 '''
108 temp = []
109 for name in dir(obj):
110 func = getattr(obj, name)
111 if hasattr(func, '__call__'):
112 temp.append(name)
113 return temp
114
115def public_methods(obj):
116 '''
117 Return a list of names of methods of `obj` which do not start with '_'
118 '''
119 return [name for name in all_methods(obj) if name[0] != '_']
120
121#
122# Server which is run in a process controlled by a manager
123#
124
125class Server(object):
126 '''
127 Server class which runs in a process controlled by a manager object
128 '''
129 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
130 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
131
132 def __init__(self, registry, address, authkey, serializer):
133 assert isinstance(authkey, bytes)
134 self.registry = registry
135 self.authkey = AuthenticationString(authkey)
136 Listener, Client = listener_client[serializer]
137
138 # do authentication later
139 self.listener = Listener(address=address, backlog=5)
140 self.address = self.listener.address
141
Jesse Noller63b3a972009-01-21 02:15:48 +0000142 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143 self.id_to_refcount = {}
144 self.mutex = threading.RLock()
145 self.stop = 0
146
147 def serve_forever(self):
148 '''
149 Run the server forever
150 '''
151 current_process()._manager_server = self
152 try:
153 try:
154 while 1:
155 try:
156 c = self.listener.accept()
157 except (OSError, IOError):
158 continue
159 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000160 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161 t.start()
162 except (KeyboardInterrupt, SystemExit):
163 pass
164 finally:
165 self.stop = 999
166 self.listener.close()
167
168 def handle_request(self, c):
169 '''
170 Handle a new connection
171 '''
172 funcname = result = request = None
173 try:
174 connection.deliver_challenge(c, self.authkey)
175 connection.answer_challenge(c, self.authkey)
176 request = c.recv()
177 ignore, funcname, args, kwds = request
178 assert funcname in self.public, '%r unrecognized' % funcname
179 func = getattr(self, funcname)
180 except Exception:
181 msg = ('#TRACEBACK', format_exc())
182 else:
183 try:
184 result = func(c, *args, **kwds)
185 except Exception:
186 msg = ('#TRACEBACK', format_exc())
187 else:
188 msg = ('#RETURN', result)
189 try:
190 c.send(msg)
191 except Exception as e:
192 try:
193 c.send(('#TRACEBACK', format_exc()))
194 except Exception:
195 pass
196 util.info('Failure to send message: %r', msg)
197 util.info(' ... request was %r', request)
198 util.info(' ... exception was %r', e)
199
200 c.close()
201
202 def serve_client(self, conn):
203 '''
204 Handle requests from the proxies in a particular process/thread
205 '''
206 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000207 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208
209 recv = conn.recv
210 send = conn.send
211 id_to_obj = self.id_to_obj
212
213 while not self.stop:
214
215 try:
216 methodname = obj = None
217 request = recv()
218 ident, methodname, args, kwds = request
219 obj, exposed, gettypeid = id_to_obj[ident]
220
221 if methodname not in exposed:
222 raise AttributeError(
223 'method %r of %r object is not in exposed=%r' %
224 (methodname, type(obj), exposed)
225 )
226
227 function = getattr(obj, methodname)
228
229 try:
230 res = function(*args, **kwds)
231 except Exception as e:
232 msg = ('#ERROR', e)
233 else:
234 typeid = gettypeid and gettypeid.get(methodname, None)
235 if typeid:
236 rident, rexposed = self.create(conn, typeid, res)
237 token = Token(typeid, self.address, rident)
238 msg = ('#PROXY', (rexposed, token))
239 else:
240 msg = ('#RETURN', res)
241
242 except AttributeError:
243 if methodname is None:
244 msg = ('#TRACEBACK', format_exc())
245 else:
246 try:
247 fallback_func = self.fallback_mapping[methodname]
248 result = fallback_func(
249 self, conn, ident, obj, *args, **kwds
250 )
251 msg = ('#RETURN', result)
252 except Exception:
253 msg = ('#TRACEBACK', format_exc())
254
255 except EOFError:
256 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000257 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258 sys.exit(0)
259
260 except Exception:
261 msg = ('#TRACEBACK', format_exc())
262
263 try:
264 try:
265 send(msg)
266 except Exception as e:
267 send(('#UNSERIALIZABLE', repr(msg)))
268 except Exception as e:
269 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000270 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 util.info(' ... message was %r', msg)
272 util.info(' ... exception was %r', e)
273 conn.close()
274 sys.exit(1)
275
276 def fallback_getvalue(self, conn, ident, obj):
277 return obj
278
279 def fallback_str(self, conn, ident, obj):
280 return str(obj)
281
282 def fallback_repr(self, conn, ident, obj):
283 return repr(obj)
284
285 fallback_mapping = {
286 '__str__':fallback_str,
287 '__repr__':fallback_repr,
288 '#GETVALUE':fallback_getvalue
289 }
290
291 def dummy(self, c):
292 pass
293
294 def debug_info(self, c):
295 '''
296 Return some info --- useful to spot problems with refcounting
297 '''
298 self.mutex.acquire()
299 try:
300 result = []
301 keys = list(self.id_to_obj.keys())
302 keys.sort()
303 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000304 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000305 result.append(' %s: refcount=%s\n %s' %
306 (ident, self.id_to_refcount[ident],
307 str(self.id_to_obj[ident][0])[:75]))
308 return '\n'.join(result)
309 finally:
310 self.mutex.release()
311
312 def number_of_objects(self, c):
313 '''
314 Number of shared objects
315 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000316 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000317
318 def shutdown(self, c):
319 '''
320 Shutdown this process
321 '''
322 try:
323 try:
324 util.debug('manager received shutdown message')
325 c.send(('#RETURN', None))
326
327 if sys.stdout != sys.__stdout__:
328 util.debug('resetting stdout, stderr')
329 sys.stdout = sys.__stdout__
330 sys.stderr = sys.__stderr__
331
332 util._run_finalizers(0)
333
334 for p in active_children():
335 util.debug('terminating a child process of manager')
336 p.terminate()
337
338 for p in active_children():
339 util.debug('terminating a child process of manager')
340 p.join()
341
342 util._run_finalizers()
343 util.info('manager exiting with exitcode 0')
344 except:
345 import traceback
346 traceback.print_exc()
347 finally:
348 exit(0)
349
350 def create(self, c, typeid, *args, **kwds):
351 '''
352 Create a new shared object and return its id
353 '''
354 self.mutex.acquire()
355 try:
356 callable, exposed, method_to_typeid, proxytype = \
357 self.registry[typeid]
358
359 if callable is None:
360 assert len(args) == 1 and not kwds
361 obj = args[0]
362 else:
363 obj = callable(*args, **kwds)
364
365 if exposed is None:
366 exposed = public_methods(obj)
367 if method_to_typeid is not None:
368 assert type(method_to_typeid) is dict
369 exposed = list(exposed) + list(method_to_typeid)
370
371 ident = '%x' % id(obj) # convert to string because xmlrpclib
372 # only has 32 bit signed integers
373 util.debug('%r callable returned object with id %r', typeid, ident)
374
375 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
376 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000377 self.id_to_refcount[ident] = 0
378 # increment the reference count immediately, to avoid
379 # this object being garbage collected before a Proxy
380 # object for it can be created. The caller of create()
381 # is responsible for doing a decref once the Proxy object
382 # has been created.
383 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000384 return ident, tuple(exposed)
385 finally:
386 self.mutex.release()
387
388 def get_methods(self, c, token):
389 '''
390 Return the methods of the shared object indicated by token
391 '''
392 return tuple(self.id_to_obj[token.id][1])
393
394 def accept_connection(self, c, name):
395 '''
396 Spawn a new thread to serve this connection
397 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000398 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000399 c.send(('#RETURN', None))
400 self.serve_client(c)
401
402 def incref(self, c, ident):
403 self.mutex.acquire()
404 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000405 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000406 finally:
407 self.mutex.release()
408
409 def decref(self, c, ident):
410 self.mutex.acquire()
411 try:
412 assert self.id_to_refcount[ident] >= 1
413 self.id_to_refcount[ident] -= 1
414 if self.id_to_refcount[ident] == 0:
415 del self.id_to_obj[ident], self.id_to_refcount[ident]
Benjamin Peterson4ac9ce42009-10-04 14:49:41 +0000416 util.debug('disposing of obj with id %r', ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000417 finally:
418 self.mutex.release()
419
420#
421# Class to represent state of a manager
422#
423
424class State(object):
425 __slots__ = ['value']
426 INITIAL = 0
427 STARTED = 1
428 SHUTDOWN = 2
429
430#
431# Mapping from serializer name to Listener and Client types
432#
433
434listener_client = {
435 'pickle' : (connection.Listener, connection.Client),
436 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
437 }
438
439#
440# Definition of BaseManager
441#
442
443class BaseManager(object):
444 '''
445 Base class for managers
446 '''
447 _registry = {}
448 _Server = Server
449
450 def __init__(self, address=None, authkey=None, serializer='pickle'):
451 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000452 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 self._address = address # XXX not final address if eg ('', 0)
454 self._authkey = AuthenticationString(authkey)
455 self._state = State()
456 self._state.value = State.INITIAL
457 self._serializer = serializer
458 self._Listener, self._Client = listener_client[serializer]
459
460 def __reduce__(self):
461 return type(self).from_address, \
462 (self._address, self._authkey, self._serializer)
463
464 def get_server(self):
465 '''
466 Return server object with serve_forever() method and address attribute
467 '''
468 assert self._state.value == State.INITIAL
469 return Server(self._registry, self._address,
470 self._authkey, self._serializer)
471
472 def connect(self):
473 '''
474 Connect manager object to the server process
475 '''
476 Listener, Client = listener_client[self._serializer]
477 conn = Client(self._address, authkey=self._authkey)
478 dispatch(conn, None, 'dummy')
479 self._state.value = State.STARTED
480
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000481 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000482 '''
483 Spawn a server process for this manager object
484 '''
485 assert self._state.value == State.INITIAL
486
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000487 if initializer is not None and not hasattr(initializer, '__call__'):
488 raise TypeError('initializer must be a callable')
489
Benjamin Petersone711caf2008-06-11 16:44:04 +0000490 # pipe over which we will retrieve address of server
491 reader, writer = connection.Pipe(duplex=False)
492
493 # spawn process which runs a server
494 self._process = Process(
495 target=type(self)._run_server,
496 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000497 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000498 )
499 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000500 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000501 self._process.start()
502
503 # get address of server
504 writer.close()
505 self._address = reader.recv()
506 reader.close()
507
508 # register a finalizer
509 self._state.value = State.STARTED
510 self.shutdown = util.Finalize(
511 self, type(self)._finalize_manager,
512 args=(self._process, self._address, self._authkey,
513 self._state, self._Client),
514 exitpriority=0
515 )
516
517 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000518 def _run_server(cls, registry, address, authkey, serializer, writer,
519 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520 '''
521 Create a server, report its address and run it
522 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000523 if initializer is not None:
524 initializer(*initargs)
525
Benjamin Petersone711caf2008-06-11 16:44:04 +0000526 # create server
527 server = cls._Server(registry, address, authkey, serializer)
528
529 # inform parent process of the server's address
530 writer.send(server.address)
531 writer.close()
532
533 # run the manager
534 util.info('manager serving at %r', server.address)
535 server.serve_forever()
536
537 def _create(self, typeid, *args, **kwds):
538 '''
539 Create a new shared object; return the token and exposed tuple
540 '''
541 assert self._state.value == State.STARTED, 'server not yet started'
542 conn = self._Client(self._address, authkey=self._authkey)
543 try:
544 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
545 finally:
546 conn.close()
547 return Token(typeid, self._address, id), exposed
548
549 def join(self, timeout=None):
550 '''
551 Join the manager process (if it has been spawned)
552 '''
553 self._process.join(timeout)
554
555 def _debug_info(self):
556 '''
557 Return some info about the servers shared objects and connections
558 '''
559 conn = self._Client(self._address, authkey=self._authkey)
560 try:
561 return dispatch(conn, None, 'debug_info')
562 finally:
563 conn.close()
564
565 def _number_of_objects(self):
566 '''
567 Return the number of shared objects
568 '''
569 conn = self._Client(self._address, authkey=self._authkey)
570 try:
571 return dispatch(conn, None, 'number_of_objects')
572 finally:
573 conn.close()
574
575 def __enter__(self):
576 return self
577
578 def __exit__(self, exc_type, exc_val, exc_tb):
579 self.shutdown()
580
581 @staticmethod
582 def _finalize_manager(process, address, authkey, state, _Client):
583 '''
584 Shutdown the manager process; will be registered as a finalizer
585 '''
586 if process.is_alive():
587 util.info('sending shutdown message to manager')
588 try:
589 conn = _Client(address, authkey=authkey)
590 try:
591 dispatch(conn, None, 'shutdown')
592 finally:
593 conn.close()
594 except Exception:
595 pass
596
597 process.join(timeout=0.2)
598 if process.is_alive():
599 util.info('manager still alive')
600 if hasattr(process, 'terminate'):
601 util.info('trying to `terminate()` manager process')
602 process.terminate()
603 process.join(timeout=0.1)
604 if process.is_alive():
605 util.info('manager still alive after terminate')
606
607 state.value = State.SHUTDOWN
608 try:
609 del BaseProxy._address_to_local[address]
610 except KeyError:
611 pass
612
613 address = property(lambda self: self._address)
614
615 @classmethod
616 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
617 method_to_typeid=None, create_method=True):
618 '''
619 Register a typeid with the manager type
620 '''
621 if '_registry' not in cls.__dict__:
622 cls._registry = cls._registry.copy()
623
624 if proxytype is None:
625 proxytype = AutoProxy
626
627 exposed = exposed or getattr(proxytype, '_exposed_', None)
628
629 method_to_typeid = method_to_typeid or \
630 getattr(proxytype, '_method_to_typeid_', None)
631
632 if method_to_typeid:
633 for key, value in list(method_to_typeid.items()):
634 assert type(key) is str, '%r is not a string' % key
635 assert type(value) is str, '%r is not a string' % value
636
637 cls._registry[typeid] = (
638 callable, exposed, method_to_typeid, proxytype
639 )
640
641 if create_method:
642 def temp(self, *args, **kwds):
643 util.debug('requesting creation of a shared %r object', typeid)
644 token, exp = self._create(typeid, *args, **kwds)
645 proxy = proxytype(
646 token, self._serializer, manager=self,
647 authkey=self._authkey, exposed=exp
648 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000649 conn = self._Client(token.address, authkey=self._authkey)
650 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000651 return proxy
652 temp.__name__ = typeid
653 setattr(cls, typeid, temp)
654
655#
656# Subclass of set which get cleared after a fork
657#
658
659class ProcessLocalSet(set):
660 def __init__(self):
661 util.register_after_fork(self, lambda obj: obj.clear())
662 def __reduce__(self):
663 return type(self), ()
664
665#
666# Definition of BaseProxy
667#
668
669class BaseProxy(object):
670 '''
671 A base for proxies of shared objects
672 '''
673 _address_to_local = {}
674 _mutex = util.ForkAwareThreadLock()
675
676 def __init__(self, token, serializer, manager=None,
677 authkey=None, exposed=None, incref=True):
678 BaseProxy._mutex.acquire()
679 try:
680 tls_idset = BaseProxy._address_to_local.get(token.address, None)
681 if tls_idset is None:
682 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
683 BaseProxy._address_to_local[token.address] = tls_idset
684 finally:
685 BaseProxy._mutex.release()
686
687 # self._tls is used to record the connection used by this
688 # thread to communicate with the manager at token.address
689 self._tls = tls_idset[0]
690
691 # self._idset is used to record the identities of all shared
692 # objects for which the current process owns references and
693 # which are in the manager at token.address
694 self._idset = tls_idset[1]
695
696 self._token = token
697 self._id = self._token.id
698 self._manager = manager
699 self._serializer = serializer
700 self._Client = listener_client[serializer][1]
701
702 if authkey is not None:
703 self._authkey = AuthenticationString(authkey)
704 elif self._manager is not None:
705 self._authkey = self._manager._authkey
706 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000707 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000708
709 if incref:
710 self._incref()
711
712 util.register_after_fork(self, BaseProxy._after_fork)
713
714 def _connect(self):
715 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000716 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000717 if threading.current_thread().name != 'MainThread':
718 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000719 conn = self._Client(self._token.address, authkey=self._authkey)
720 dispatch(conn, None, 'accept_connection', (name,))
721 self._tls.connection = conn
722
723 def _callmethod(self, methodname, args=(), kwds={}):
724 '''
725 Try to call a method of the referrent and return a copy of the result
726 '''
727 try:
728 conn = self._tls.connection
729 except AttributeError:
730 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000731 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000732 self._connect()
733 conn = self._tls.connection
734
735 conn.send((self._id, methodname, args, kwds))
736 kind, result = conn.recv()
737
738 if kind == '#RETURN':
739 return result
740 elif kind == '#PROXY':
741 exposed, token = result
742 proxytype = self._manager._registry[token.typeid][-1]
Jesse Noller824f4f32008-09-02 19:12:20 +0000743 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000744 token, self._serializer, manager=self._manager,
745 authkey=self._authkey, exposed=exposed
746 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000747 conn = self._Client(token.address, authkey=self._authkey)
748 dispatch(conn, None, 'decref', (token.id,))
749 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000750 raise convert_to_error(kind, result)
751
752 def _getvalue(self):
753 '''
754 Get a copy of the value of the referent
755 '''
756 return self._callmethod('#GETVALUE')
757
758 def _incref(self):
759 conn = self._Client(self._token.address, authkey=self._authkey)
760 dispatch(conn, None, 'incref', (self._id,))
761 util.debug('INCREF %r', self._token.id)
762
763 self._idset.add(self._id)
764
765 state = self._manager and self._manager._state
766
767 self._close = util.Finalize(
768 self, BaseProxy._decref,
769 args=(self._token, self._authkey, state,
770 self._tls, self._idset, self._Client),
771 exitpriority=10
772 )
773
774 @staticmethod
775 def _decref(token, authkey, state, tls, idset, _Client):
776 idset.discard(token.id)
777
778 # check whether manager is still alive
779 if state is None or state.value == State.STARTED:
780 # tell manager this process no longer cares about referent
781 try:
782 util.debug('DECREF %r', token.id)
783 conn = _Client(token.address, authkey=authkey)
784 dispatch(conn, None, 'decref', (token.id,))
785 except Exception as e:
786 util.debug('... decref failed %s', e)
787
788 else:
789 util.debug('DECREF %r -- manager already shutdown', token.id)
790
791 # check whether we can close this thread's connection because
792 # the process owns no more references to objects for this manager
793 if not idset and hasattr(tls, 'connection'):
794 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000795 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000796 tls.connection.close()
797 del tls.connection
798
799 def _after_fork(self):
800 self._manager = None
801 try:
802 self._incref()
803 except Exception as e:
804 # the proxy may just be for a manager which has shutdown
805 util.info('incref failed: %s' % e)
806
807 def __reduce__(self):
808 kwds = {}
809 if Popen.thread_is_spawning():
810 kwds['authkey'] = self._authkey
811
812 if getattr(self, '_isauto', False):
813 kwds['exposed'] = self._exposed_
814 return (RebuildProxy,
815 (AutoProxy, self._token, self._serializer, kwds))
816 else:
817 return (RebuildProxy,
818 (type(self), self._token, self._serializer, kwds))
819
820 def __deepcopy__(self, memo):
821 return self._getvalue()
822
823 def __repr__(self):
824 return '<%s object, typeid %r at %s>' % \
825 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
826
827 def __str__(self):
828 '''
829 Return representation of the referent (or a fall-back if that fails)
830 '''
831 try:
832 return self._callmethod('__repr__')
833 except Exception:
834 return repr(self)[:-1] + "; '__str__()' failed>"
835
836#
837# Function used for unpickling
838#
839
840def RebuildProxy(func, token, serializer, kwds):
841 '''
842 Function used for unpickling proxy objects.
843
844 If possible the shared object is returned, or otherwise a proxy for it.
845 '''
846 server = getattr(current_process(), '_manager_server', None)
847
848 if server and server.address == token.address:
849 return server.id_to_obj[token.id][0]
850 else:
851 incref = (
852 kwds.pop('incref', True) and
853 not getattr(current_process(), '_inheriting', False)
854 )
855 return func(token, serializer, incref=incref, **kwds)
856
857#
858# Functions to create proxies and proxy types
859#
860
861def MakeProxyType(name, exposed, _cache={}):
862 '''
863 Return an proxy type whose methods are given by `exposed`
864 '''
865 exposed = tuple(exposed)
866 try:
867 return _cache[(name, exposed)]
868 except KeyError:
869 pass
870
871 dic = {}
872
873 for meth in exposed:
874 exec('''def %s(self, *args, **kwds):
875 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
876
877 ProxyType = type(name, (BaseProxy,), dic)
878 ProxyType._exposed_ = exposed
879 _cache[(name, exposed)] = ProxyType
880 return ProxyType
881
882
883def AutoProxy(token, serializer, manager=None, authkey=None,
884 exposed=None, incref=True):
885 '''
886 Return an auto-proxy for `token`
887 '''
888 _Client = listener_client[serializer][1]
889
890 if exposed is None:
891 conn = _Client(token.address, authkey=authkey)
892 try:
893 exposed = dispatch(conn, None, 'get_methods', (token,))
894 finally:
895 conn.close()
896
897 if authkey is None and manager is not None:
898 authkey = manager._authkey
899 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000900 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000901
902 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
903 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
904 incref=incref)
905 proxy._isauto = True
906 return proxy
907
908#
909# Types/callables which we will register with SyncManager
910#
911
912class Namespace(object):
913 def __init__(self, **kwds):
914 self.__dict__.update(kwds)
915 def __repr__(self):
916 items = list(self.__dict__.items())
917 temp = []
918 for name, value in items:
919 if not name.startswith('_'):
920 temp.append('%s=%r' % (name, value))
921 temp.sort()
922 return 'Namespace(%s)' % str.join(', ', temp)
923
924class Value(object):
925 def __init__(self, typecode, value, lock=True):
926 self._typecode = typecode
927 self._value = value
928 def get(self):
929 return self._value
930 def set(self, value):
931 self._value = value
932 def __repr__(self):
933 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
934 value = property(get, set)
935
936def Array(typecode, sequence, lock=True):
937 return array.array(typecode, sequence)
938
939#
940# Proxy types used by SyncManager
941#
942
943class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000944 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000945 def __iter__(self):
946 return self
947 def __next__(self, *args):
948 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000949 def send(self, *args):
950 return self._callmethod('send', args)
951 def throw(self, *args):
952 return self._callmethod('throw', args)
953 def close(self, *args):
954 return self._callmethod('close', args)
955
956
957class AcquirerProxy(BaseProxy):
958 _exposed_ = ('acquire', 'release')
959 def acquire(self, blocking=True):
960 return self._callmethod('acquire', (blocking,))
961 def release(self):
962 return self._callmethod('release')
963 def __enter__(self):
964 return self._callmethod('acquire')
965 def __exit__(self, exc_type, exc_val, exc_tb):
966 return self._callmethod('release')
967
968
969class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000970 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 def wait(self, timeout=None):
972 return self._callmethod('wait', (timeout,))
973 def notify(self):
974 return self._callmethod('notify')
975 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000976 return self._callmethod('notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000977
978class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000979 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000980 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000981 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000982 def set(self):
983 return self._callmethod('set')
984 def clear(self):
985 return self._callmethod('clear')
986 def wait(self, timeout=None):
987 return self._callmethod('wait', (timeout,))
988
989class NamespaceProxy(BaseProxy):
990 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
991 def __getattr__(self, key):
992 if key[0] == '_':
993 return object.__getattribute__(self, key)
994 callmethod = object.__getattribute__(self, '_callmethod')
995 return callmethod('__getattribute__', (key,))
996 def __setattr__(self, key, value):
997 if key[0] == '_':
998 return object.__setattr__(self, key, value)
999 callmethod = object.__getattribute__(self, '_callmethod')
1000 return callmethod('__setattr__', (key, value))
1001 def __delattr__(self, key):
1002 if key[0] == '_':
1003 return object.__delattr__(self, key)
1004 callmethod = object.__getattribute__(self, '_callmethod')
1005 return callmethod('__delattr__', (key,))
1006
1007
1008class ValueProxy(BaseProxy):
1009 _exposed_ = ('get', 'set')
1010 def get(self):
1011 return self._callmethod('get')
1012 def set(self, value):
1013 return self._callmethod('set', (value,))
1014 value = property(get, set)
1015
1016
1017BaseListProxy = MakeProxyType('BaseListProxy', (
1018 '__add__', '__contains__', '__delitem__', '__delslice__',
1019 '__getitem__', '__getslice__', '__len__', '__mul__',
1020 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1021 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1022 'reverse', 'sort', '__imul__'
1023 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1024class ListProxy(BaseListProxy):
1025 def __iadd__(self, value):
1026 self._callmethod('extend', (value,))
1027 return self
1028 def __imul__(self, value):
1029 self._callmethod('__imul__', (value,))
1030 return self
1031
1032
1033DictProxy = MakeProxyType('DictProxy', (
1034 '__contains__', '__delitem__', '__getitem__', '__len__',
1035 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1036 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1037 ))
1038
1039
1040ArrayProxy = MakeProxyType('ArrayProxy', (
1041 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1042 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1043
1044
1045PoolProxy = MakeProxyType('PoolProxy', (
1046 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1047 'map', 'map_async', 'terminate'
1048 ))
1049PoolProxy._method_to_typeid_ = {
1050 'apply_async': 'AsyncResult',
1051 'map_async': 'AsyncResult',
1052 'imap': 'Iterator',
1053 'imap_unordered': 'Iterator'
1054 }
1055
1056#
1057# Definition of SyncManager
1058#
1059
1060class SyncManager(BaseManager):
1061 '''
1062 Subclass of `BaseManager` which supports a number of shared object types.
1063
1064 The types registered are those intended for the synchronization
1065 of threads, plus `dict`, `list` and `Namespace`.
1066
1067 The `multiprocessing.Manager()` function creates started instances of
1068 this class.
1069 '''
1070
1071SyncManager.register('Queue', queue.Queue)
1072SyncManager.register('JoinableQueue', queue.Queue)
1073SyncManager.register('Event', threading.Event, EventProxy)
1074SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1075SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1076SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1077SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1078 AcquirerProxy)
1079SyncManager.register('Condition', threading.Condition, ConditionProxy)
1080SyncManager.register('Pool', Pool, PoolProxy)
1081SyncManager.register('list', list, ListProxy)
1082SyncManager.register('dict', dict, DictProxy)
1083SyncManager.register('Value', Value, ValueProxy)
1084SyncManager.register('Array', Array, ArrayProxy)
1085SyncManager.register('Namespace', Namespace, NamespaceProxy)
1086
1087# types returned by methods of PoolProxy
1088SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1089SyncManager.register('AsyncResult', create_method=False)