blob: 36bcf8f07a204157cc3fbfab86b2af71d763c957 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
22from traceback import format_exc
23from multiprocessing import Process, current_process, active_children, Pool, util, connection
24from multiprocessing.process import AuthenticationString
Florent Xicluna04842a82011-11-11 20:05:50 +010025from multiprocessing.forking import exit, Popen, ForkingPickler
Charles-François Natalic8ce7152012-04-17 18:45:57 +020026from time import time as _time
Benjamin Petersone711caf2008-06-11 16:44:04 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028#
Benjamin Petersone711caf2008-06-11 16:44:04 +000029# Register some things for pickling
30#
31
32def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000033 return array.array, (a.typecode, a.tobytes())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000034ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000035
36view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000037if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000038 def rebuild_as_list(obj):
39 return list, (list(obj),)
40 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000041 ForkingPickler.register(view_type, rebuild_as_list)
Amaury Forgeot d'Arcd757e732008-08-20 08:58:40 +000042 import copyreg
43 copyreg.pickle(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000044
45#
46# Type for identifying shared objects
47#
48
49class Token(object):
50 '''
51 Type to uniquely indentify a shared object
52 '''
53 __slots__ = ('typeid', 'address', 'id')
54
55 def __init__(self, typeid, address, id):
56 (self.typeid, self.address, self.id) = (typeid, address, id)
57
58 def __getstate__(self):
59 return (self.typeid, self.address, self.id)
60
61 def __setstate__(self, state):
62 (self.typeid, self.address, self.id) = state
63
64 def __repr__(self):
65 return 'Token(typeid=%r, address=%r, id=%r)' % \
66 (self.typeid, self.address, self.id)
67
68#
69# Function for communication with a manager's server process
70#
71
72def dispatch(c, id, methodname, args=(), kwds={}):
73 '''
74 Send a message to manager using connection `c` and return response
75 '''
76 c.send((id, methodname, args, kwds))
77 kind, result = c.recv()
78 if kind == '#RETURN':
79 return result
80 raise convert_to_error(kind, result)
81
82def convert_to_error(kind, result):
83 if kind == '#ERROR':
84 return result
85 elif kind == '#TRACEBACK':
86 assert type(result) is str
87 return RemoteError(result)
88 elif kind == '#UNSERIALIZABLE':
89 assert type(result) is str
90 return RemoteError('Unserializable message: %s\n' % result)
91 else:
92 return ValueError('Unrecognized message type')
93
94class RemoteError(Exception):
95 def __str__(self):
96 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
97
98#
99# Functions for finding the method names of an object
100#
101
102def all_methods(obj):
103 '''
104 Return a list of names of methods of `obj`
105 '''
106 temp = []
107 for name in dir(obj):
108 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200109 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000110 temp.append(name)
111 return temp
112
113def public_methods(obj):
114 '''
115 Return a list of names of methods of `obj` which do not start with '_'
116 '''
117 return [name for name in all_methods(obj) if name[0] != '_']
118
119#
120# Server which is run in a process controlled by a manager
121#
122
123class Server(object):
124 '''
125 Server class which runs in a process controlled by a manager object
126 '''
127 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
128 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
129
130 def __init__(self, registry, address, authkey, serializer):
131 assert isinstance(authkey, bytes)
132 self.registry = registry
133 self.authkey = AuthenticationString(authkey)
134 Listener, Client = listener_client[serializer]
135
136 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100137 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000138 self.address = self.listener.address
139
Jesse Noller63b3a972009-01-21 02:15:48 +0000140 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000141 self.id_to_refcount = {}
142 self.mutex = threading.RLock()
143 self.stop = 0
144
145 def serve_forever(self):
146 '''
147 Run the server forever
148 '''
149 current_process()._manager_server = self
150 try:
151 try:
152 while 1:
153 try:
154 c = self.listener.accept()
155 except (OSError, IOError):
156 continue
157 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000158 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 t.start()
160 except (KeyboardInterrupt, SystemExit):
161 pass
162 finally:
163 self.stop = 999
164 self.listener.close()
165
166 def handle_request(self, c):
167 '''
168 Handle a new connection
169 '''
170 funcname = result = request = None
171 try:
172 connection.deliver_challenge(c, self.authkey)
173 connection.answer_challenge(c, self.authkey)
174 request = c.recv()
175 ignore, funcname, args, kwds = request
176 assert funcname in self.public, '%r unrecognized' % funcname
177 func = getattr(self, funcname)
178 except Exception:
179 msg = ('#TRACEBACK', format_exc())
180 else:
181 try:
182 result = func(c, *args, **kwds)
183 except Exception:
184 msg = ('#TRACEBACK', format_exc())
185 else:
186 msg = ('#RETURN', result)
187 try:
188 c.send(msg)
189 except Exception as e:
190 try:
191 c.send(('#TRACEBACK', format_exc()))
192 except Exception:
193 pass
194 util.info('Failure to send message: %r', msg)
195 util.info(' ... request was %r', request)
196 util.info(' ... exception was %r', e)
197
198 c.close()
199
200 def serve_client(self, conn):
201 '''
202 Handle requests from the proxies in a particular process/thread
203 '''
204 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000205 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
207 recv = conn.recv
208 send = conn.send
209 id_to_obj = self.id_to_obj
210
211 while not self.stop:
212
213 try:
214 methodname = obj = None
215 request = recv()
216 ident, methodname, args, kwds = request
217 obj, exposed, gettypeid = id_to_obj[ident]
218
219 if methodname not in exposed:
220 raise AttributeError(
221 'method %r of %r object is not in exposed=%r' %
222 (methodname, type(obj), exposed)
223 )
224
225 function = getattr(obj, methodname)
226
227 try:
228 res = function(*args, **kwds)
229 except Exception as e:
230 msg = ('#ERROR', e)
231 else:
232 typeid = gettypeid and gettypeid.get(methodname, None)
233 if typeid:
234 rident, rexposed = self.create(conn, typeid, res)
235 token = Token(typeid, self.address, rident)
236 msg = ('#PROXY', (rexposed, token))
237 else:
238 msg = ('#RETURN', res)
239
240 except AttributeError:
241 if methodname is None:
242 msg = ('#TRACEBACK', format_exc())
243 else:
244 try:
245 fallback_func = self.fallback_mapping[methodname]
246 result = fallback_func(
247 self, conn, ident, obj, *args, **kwds
248 )
249 msg = ('#RETURN', result)
250 except Exception:
251 msg = ('#TRACEBACK', format_exc())
252
253 except EOFError:
254 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000255 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256 sys.exit(0)
257
258 except Exception:
259 msg = ('#TRACEBACK', format_exc())
260
261 try:
262 try:
263 send(msg)
264 except Exception as e:
265 send(('#UNSERIALIZABLE', repr(msg)))
266 except Exception as e:
267 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000268 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269 util.info(' ... message was %r', msg)
270 util.info(' ... exception was %r', e)
271 conn.close()
272 sys.exit(1)
273
274 def fallback_getvalue(self, conn, ident, obj):
275 return obj
276
277 def fallback_str(self, conn, ident, obj):
278 return str(obj)
279
280 def fallback_repr(self, conn, ident, obj):
281 return repr(obj)
282
283 fallback_mapping = {
284 '__str__':fallback_str,
285 '__repr__':fallback_repr,
286 '#GETVALUE':fallback_getvalue
287 }
288
289 def dummy(self, c):
290 pass
291
292 def debug_info(self, c):
293 '''
294 Return some info --- useful to spot problems with refcounting
295 '''
296 self.mutex.acquire()
297 try:
298 result = []
299 keys = list(self.id_to_obj.keys())
300 keys.sort()
301 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000302 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000303 result.append(' %s: refcount=%s\n %s' %
304 (ident, self.id_to_refcount[ident],
305 str(self.id_to_obj[ident][0])[:75]))
306 return '\n'.join(result)
307 finally:
308 self.mutex.release()
309
310 def number_of_objects(self, c):
311 '''
312 Number of shared objects
313 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000314 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000315
316 def shutdown(self, c):
317 '''
318 Shutdown this process
319 '''
320 try:
321 try:
322 util.debug('manager received shutdown message')
323 c.send(('#RETURN', None))
324
325 if sys.stdout != sys.__stdout__:
326 util.debug('resetting stdout, stderr')
327 sys.stdout = sys.__stdout__
328 sys.stderr = sys.__stderr__
329
330 util._run_finalizers(0)
331
332 for p in active_children():
333 util.debug('terminating a child process of manager')
334 p.terminate()
335
336 for p in active_children():
337 util.debug('terminating a child process of manager')
338 p.join()
339
340 util._run_finalizers()
341 util.info('manager exiting with exitcode 0')
342 except:
343 import traceback
344 traceback.print_exc()
345 finally:
346 exit(0)
347
348 def create(self, c, typeid, *args, **kwds):
349 '''
350 Create a new shared object and return its id
351 '''
352 self.mutex.acquire()
353 try:
354 callable, exposed, method_to_typeid, proxytype = \
355 self.registry[typeid]
356
357 if callable is None:
358 assert len(args) == 1 and not kwds
359 obj = args[0]
360 else:
361 obj = callable(*args, **kwds)
362
363 if exposed is None:
364 exposed = public_methods(obj)
365 if method_to_typeid is not None:
366 assert type(method_to_typeid) is dict
367 exposed = list(exposed) + list(method_to_typeid)
368
369 ident = '%x' % id(obj) # convert to string because xmlrpclib
370 # only has 32 bit signed integers
371 util.debug('%r callable returned object with id %r', typeid, ident)
372
373 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
374 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000375 self.id_to_refcount[ident] = 0
376 # increment the reference count immediately, to avoid
377 # this object being garbage collected before a Proxy
378 # object for it can be created. The caller of create()
379 # is responsible for doing a decref once the Proxy object
380 # has been created.
381 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000382 return ident, tuple(exposed)
383 finally:
384 self.mutex.release()
385
386 def get_methods(self, c, token):
387 '''
388 Return the methods of the shared object indicated by token
389 '''
390 return tuple(self.id_to_obj[token.id][1])
391
392 def accept_connection(self, c, name):
393 '''
394 Spawn a new thread to serve this connection
395 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000396 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397 c.send(('#RETURN', None))
398 self.serve_client(c)
399
400 def incref(self, c, ident):
401 self.mutex.acquire()
402 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000403 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000404 finally:
405 self.mutex.release()
406
407 def decref(self, c, ident):
408 self.mutex.acquire()
409 try:
410 assert self.id_to_refcount[ident] >= 1
411 self.id_to_refcount[ident] -= 1
412 if self.id_to_refcount[ident] == 0:
413 del self.id_to_obj[ident], self.id_to_refcount[ident]
Benjamin Peterson4ac9ce42009-10-04 14:49:41 +0000414 util.debug('disposing of obj with id %r', ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000415 finally:
416 self.mutex.release()
417
418#
419# Class to represent state of a manager
420#
421
422class State(object):
423 __slots__ = ['value']
424 INITIAL = 0
425 STARTED = 1
426 SHUTDOWN = 2
427
428#
429# Mapping from serializer name to Listener and Client types
430#
431
432listener_client = {
433 'pickle' : (connection.Listener, connection.Client),
434 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
435 }
436
437#
438# Definition of BaseManager
439#
440
441class BaseManager(object):
442 '''
443 Base class for managers
444 '''
445 _registry = {}
446 _Server = Server
447
448 def __init__(self, address=None, authkey=None, serializer='pickle'):
449 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000450 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000451 self._address = address # XXX not final address if eg ('', 0)
452 self._authkey = AuthenticationString(authkey)
453 self._state = State()
454 self._state.value = State.INITIAL
455 self._serializer = serializer
456 self._Listener, self._Client = listener_client[serializer]
457
458 def __reduce__(self):
459 return type(self).from_address, \
460 (self._address, self._authkey, self._serializer)
461
462 def get_server(self):
463 '''
464 Return server object with serve_forever() method and address attribute
465 '''
466 assert self._state.value == State.INITIAL
467 return Server(self._registry, self._address,
468 self._authkey, self._serializer)
469
470 def connect(self):
471 '''
472 Connect manager object to the server process
473 '''
474 Listener, Client = listener_client[self._serializer]
475 conn = Client(self._address, authkey=self._authkey)
476 dispatch(conn, None, 'dummy')
477 self._state.value = State.STARTED
478
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000479 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000480 '''
481 Spawn a server process for this manager object
482 '''
483 assert self._state.value == State.INITIAL
484
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200485 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000486 raise TypeError('initializer must be a callable')
487
Benjamin Petersone711caf2008-06-11 16:44:04 +0000488 # pipe over which we will retrieve address of server
489 reader, writer = connection.Pipe(duplex=False)
490
491 # spawn process which runs a server
492 self._process = Process(
493 target=type(self)._run_server,
494 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000495 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000496 )
497 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000498 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000499 self._process.start()
500
501 # get address of server
502 writer.close()
503 self._address = reader.recv()
504 reader.close()
505
506 # register a finalizer
507 self._state.value = State.STARTED
508 self.shutdown = util.Finalize(
509 self, type(self)._finalize_manager,
510 args=(self._process, self._address, self._authkey,
511 self._state, self._Client),
512 exitpriority=0
513 )
514
515 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000516 def _run_server(cls, registry, address, authkey, serializer, writer,
517 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000518 '''
519 Create a server, report its address and run it
520 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000521 if initializer is not None:
522 initializer(*initargs)
523
Benjamin Petersone711caf2008-06-11 16:44:04 +0000524 # create server
525 server = cls._Server(registry, address, authkey, serializer)
526
527 # inform parent process of the server's address
528 writer.send(server.address)
529 writer.close()
530
531 # run the manager
532 util.info('manager serving at %r', server.address)
533 server.serve_forever()
534
535 def _create(self, typeid, *args, **kwds):
536 '''
537 Create a new shared object; return the token and exposed tuple
538 '''
539 assert self._state.value == State.STARTED, 'server not yet started'
540 conn = self._Client(self._address, authkey=self._authkey)
541 try:
542 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
543 finally:
544 conn.close()
545 return Token(typeid, self._address, id), exposed
546
547 def join(self, timeout=None):
548 '''
549 Join the manager process (if it has been spawned)
550 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100551 if self._process is not None:
552 self._process.join(timeout)
553 if not self._process.is_alive():
554 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000555
556 def _debug_info(self):
557 '''
558 Return some info about the servers shared objects and connections
559 '''
560 conn = self._Client(self._address, authkey=self._authkey)
561 try:
562 return dispatch(conn, None, 'debug_info')
563 finally:
564 conn.close()
565
566 def _number_of_objects(self):
567 '''
568 Return the number of shared objects
569 '''
570 conn = self._Client(self._address, authkey=self._authkey)
571 try:
572 return dispatch(conn, None, 'number_of_objects')
573 finally:
574 conn.close()
575
576 def __enter__(self):
577 return self
578
579 def __exit__(self, exc_type, exc_val, exc_tb):
580 self.shutdown()
581
582 @staticmethod
583 def _finalize_manager(process, address, authkey, state, _Client):
584 '''
585 Shutdown the manager process; will be registered as a finalizer
586 '''
587 if process.is_alive():
588 util.info('sending shutdown message to manager')
589 try:
590 conn = _Client(address, authkey=authkey)
591 try:
592 dispatch(conn, None, 'shutdown')
593 finally:
594 conn.close()
595 except Exception:
596 pass
597
598 process.join(timeout=0.2)
599 if process.is_alive():
600 util.info('manager still alive')
601 if hasattr(process, 'terminate'):
602 util.info('trying to `terminate()` manager process')
603 process.terminate()
604 process.join(timeout=0.1)
605 if process.is_alive():
606 util.info('manager still alive after terminate')
607
608 state.value = State.SHUTDOWN
609 try:
610 del BaseProxy._address_to_local[address]
611 except KeyError:
612 pass
613
614 address = property(lambda self: self._address)
615
616 @classmethod
617 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
618 method_to_typeid=None, create_method=True):
619 '''
620 Register a typeid with the manager type
621 '''
622 if '_registry' not in cls.__dict__:
623 cls._registry = cls._registry.copy()
624
625 if proxytype is None:
626 proxytype = AutoProxy
627
628 exposed = exposed or getattr(proxytype, '_exposed_', None)
629
630 method_to_typeid = method_to_typeid or \
631 getattr(proxytype, '_method_to_typeid_', None)
632
633 if method_to_typeid:
634 for key, value in list(method_to_typeid.items()):
635 assert type(key) is str, '%r is not a string' % key
636 assert type(value) is str, '%r is not a string' % value
637
638 cls._registry[typeid] = (
639 callable, exposed, method_to_typeid, proxytype
640 )
641
642 if create_method:
643 def temp(self, *args, **kwds):
644 util.debug('requesting creation of a shared %r object', typeid)
645 token, exp = self._create(typeid, *args, **kwds)
646 proxy = proxytype(
647 token, self._serializer, manager=self,
648 authkey=self._authkey, exposed=exp
649 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000650 conn = self._Client(token.address, authkey=self._authkey)
651 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000652 return proxy
653 temp.__name__ = typeid
654 setattr(cls, typeid, temp)
655
656#
657# Subclass of set which get cleared after a fork
658#
659
660class ProcessLocalSet(set):
661 def __init__(self):
662 util.register_after_fork(self, lambda obj: obj.clear())
663 def __reduce__(self):
664 return type(self), ()
665
666#
667# Definition of BaseProxy
668#
669
670class BaseProxy(object):
671 '''
672 A base for proxies of shared objects
673 '''
674 _address_to_local = {}
675 _mutex = util.ForkAwareThreadLock()
676
677 def __init__(self, token, serializer, manager=None,
678 authkey=None, exposed=None, incref=True):
679 BaseProxy._mutex.acquire()
680 try:
681 tls_idset = BaseProxy._address_to_local.get(token.address, None)
682 if tls_idset is None:
683 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
684 BaseProxy._address_to_local[token.address] = tls_idset
685 finally:
686 BaseProxy._mutex.release()
687
688 # self._tls is used to record the connection used by this
689 # thread to communicate with the manager at token.address
690 self._tls = tls_idset[0]
691
692 # self._idset is used to record the identities of all shared
693 # objects for which the current process owns references and
694 # which are in the manager at token.address
695 self._idset = tls_idset[1]
696
697 self._token = token
698 self._id = self._token.id
699 self._manager = manager
700 self._serializer = serializer
701 self._Client = listener_client[serializer][1]
702
703 if authkey is not None:
704 self._authkey = AuthenticationString(authkey)
705 elif self._manager is not None:
706 self._authkey = self._manager._authkey
707 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000708 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000709
710 if incref:
711 self._incref()
712
713 util.register_after_fork(self, BaseProxy._after_fork)
714
715 def _connect(self):
716 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000717 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000718 if threading.current_thread().name != 'MainThread':
719 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000720 conn = self._Client(self._token.address, authkey=self._authkey)
721 dispatch(conn, None, 'accept_connection', (name,))
722 self._tls.connection = conn
723
724 def _callmethod(self, methodname, args=(), kwds={}):
725 '''
726 Try to call a method of the referrent and return a copy of the result
727 '''
728 try:
729 conn = self._tls.connection
730 except AttributeError:
731 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000732 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000733 self._connect()
734 conn = self._tls.connection
735
736 conn.send((self._id, methodname, args, kwds))
737 kind, result = conn.recv()
738
739 if kind == '#RETURN':
740 return result
741 elif kind == '#PROXY':
742 exposed, token = result
743 proxytype = self._manager._registry[token.typeid][-1]
Jesse Noller824f4f32008-09-02 19:12:20 +0000744 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000745 token, self._serializer, manager=self._manager,
746 authkey=self._authkey, exposed=exposed
747 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000748 conn = self._Client(token.address, authkey=self._authkey)
749 dispatch(conn, None, 'decref', (token.id,))
750 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000751 raise convert_to_error(kind, result)
752
753 def _getvalue(self):
754 '''
755 Get a copy of the value of the referent
756 '''
757 return self._callmethod('#GETVALUE')
758
759 def _incref(self):
760 conn = self._Client(self._token.address, authkey=self._authkey)
761 dispatch(conn, None, 'incref', (self._id,))
762 util.debug('INCREF %r', self._token.id)
763
764 self._idset.add(self._id)
765
766 state = self._manager and self._manager._state
767
768 self._close = util.Finalize(
769 self, BaseProxy._decref,
770 args=(self._token, self._authkey, state,
771 self._tls, self._idset, self._Client),
772 exitpriority=10
773 )
774
775 @staticmethod
776 def _decref(token, authkey, state, tls, idset, _Client):
777 idset.discard(token.id)
778
779 # check whether manager is still alive
780 if state is None or state.value == State.STARTED:
781 # tell manager this process no longer cares about referent
782 try:
783 util.debug('DECREF %r', token.id)
784 conn = _Client(token.address, authkey=authkey)
785 dispatch(conn, None, 'decref', (token.id,))
786 except Exception as e:
787 util.debug('... decref failed %s', e)
788
789 else:
790 util.debug('DECREF %r -- manager already shutdown', token.id)
791
792 # check whether we can close this thread's connection because
793 # the process owns no more references to objects for this manager
794 if not idset and hasattr(tls, 'connection'):
795 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000796 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000797 tls.connection.close()
798 del tls.connection
799
800 def _after_fork(self):
801 self._manager = None
802 try:
803 self._incref()
804 except Exception as e:
805 # the proxy may just be for a manager which has shutdown
806 util.info('incref failed: %s' % e)
807
808 def __reduce__(self):
809 kwds = {}
810 if Popen.thread_is_spawning():
811 kwds['authkey'] = self._authkey
812
813 if getattr(self, '_isauto', False):
814 kwds['exposed'] = self._exposed_
815 return (RebuildProxy,
816 (AutoProxy, self._token, self._serializer, kwds))
817 else:
818 return (RebuildProxy,
819 (type(self), self._token, self._serializer, kwds))
820
821 def __deepcopy__(self, memo):
822 return self._getvalue()
823
824 def __repr__(self):
825 return '<%s object, typeid %r at %s>' % \
826 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
827
828 def __str__(self):
829 '''
830 Return representation of the referent (or a fall-back if that fails)
831 '''
832 try:
833 return self._callmethod('__repr__')
834 except Exception:
835 return repr(self)[:-1] + "; '__str__()' failed>"
836
837#
838# Function used for unpickling
839#
840
841def RebuildProxy(func, token, serializer, kwds):
842 '''
843 Function used for unpickling proxy objects.
844
845 If possible the shared object is returned, or otherwise a proxy for it.
846 '''
847 server = getattr(current_process(), '_manager_server', None)
848
849 if server and server.address == token.address:
850 return server.id_to_obj[token.id][0]
851 else:
852 incref = (
853 kwds.pop('incref', True) and
854 not getattr(current_process(), '_inheriting', False)
855 )
856 return func(token, serializer, incref=incref, **kwds)
857
858#
859# Functions to create proxies and proxy types
860#
861
862def MakeProxyType(name, exposed, _cache={}):
863 '''
864 Return an proxy type whose methods are given by `exposed`
865 '''
866 exposed = tuple(exposed)
867 try:
868 return _cache[(name, exposed)]
869 except KeyError:
870 pass
871
872 dic = {}
873
874 for meth in exposed:
875 exec('''def %s(self, *args, **kwds):
876 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
877
878 ProxyType = type(name, (BaseProxy,), dic)
879 ProxyType._exposed_ = exposed
880 _cache[(name, exposed)] = ProxyType
881 return ProxyType
882
883
884def AutoProxy(token, serializer, manager=None, authkey=None,
885 exposed=None, incref=True):
886 '''
887 Return an auto-proxy for `token`
888 '''
889 _Client = listener_client[serializer][1]
890
891 if exposed is None:
892 conn = _Client(token.address, authkey=authkey)
893 try:
894 exposed = dispatch(conn, None, 'get_methods', (token,))
895 finally:
896 conn.close()
897
898 if authkey is None and manager is not None:
899 authkey = manager._authkey
900 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000901 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000902
903 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
904 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
905 incref=incref)
906 proxy._isauto = True
907 return proxy
908
909#
910# Types/callables which we will register with SyncManager
911#
912
913class Namespace(object):
914 def __init__(self, **kwds):
915 self.__dict__.update(kwds)
916 def __repr__(self):
917 items = list(self.__dict__.items())
918 temp = []
919 for name, value in items:
920 if not name.startswith('_'):
921 temp.append('%s=%r' % (name, value))
922 temp.sort()
923 return 'Namespace(%s)' % str.join(', ', temp)
924
925class Value(object):
926 def __init__(self, typecode, value, lock=True):
927 self._typecode = typecode
928 self._value = value
929 def get(self):
930 return self._value
931 def set(self, value):
932 self._value = value
933 def __repr__(self):
934 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
935 value = property(get, set)
936
937def Array(typecode, sequence, lock=True):
938 return array.array(typecode, sequence)
939
940#
941# Proxy types used by SyncManager
942#
943
944class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000945 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946 def __iter__(self):
947 return self
948 def __next__(self, *args):
949 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000950 def send(self, *args):
951 return self._callmethod('send', args)
952 def throw(self, *args):
953 return self._callmethod('throw', args)
954 def close(self, *args):
955 return self._callmethod('close', args)
956
957
958class AcquirerProxy(BaseProxy):
959 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100960 def acquire(self, blocking=True, timeout=None):
961 args = (blocking,) if timeout is None else (blocking, timeout)
962 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000963 def release(self):
964 return self._callmethod('release')
965 def __enter__(self):
966 return self._callmethod('acquire')
967 def __exit__(self, exc_type, exc_val, exc_tb):
968 return self._callmethod('release')
969
970
971class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000972 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000973 def wait(self, timeout=None):
974 return self._callmethod('wait', (timeout,))
975 def notify(self):
976 return self._callmethod('notify')
977 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000978 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200979 def wait_for(self, predicate, timeout=None):
980 result = predicate()
981 if result:
982 return result
983 if timeout is not None:
984 endtime = _time() + timeout
985 else:
986 endtime = None
987 waittime = None
988 while not result:
989 if endtime is not None:
990 waittime = endtime - _time()
991 if waittime <= 0:
992 break
993 self.wait(waittime)
994 result = predicate()
995 return result
996
Benjamin Petersone711caf2008-06-11 16:44:04 +0000997
998class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000999 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001000 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001001 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001002 def set(self):
1003 return self._callmethod('set')
1004 def clear(self):
1005 return self._callmethod('clear')
1006 def wait(self, timeout=None):
1007 return self._callmethod('wait', (timeout,))
1008
1009class NamespaceProxy(BaseProxy):
1010 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1011 def __getattr__(self, key):
1012 if key[0] == '_':
1013 return object.__getattribute__(self, key)
1014 callmethod = object.__getattribute__(self, '_callmethod')
1015 return callmethod('__getattribute__', (key,))
1016 def __setattr__(self, key, value):
1017 if key[0] == '_':
1018 return object.__setattr__(self, key, value)
1019 callmethod = object.__getattribute__(self, '_callmethod')
1020 return callmethod('__setattr__', (key, value))
1021 def __delattr__(self, key):
1022 if key[0] == '_':
1023 return object.__delattr__(self, key)
1024 callmethod = object.__getattribute__(self, '_callmethod')
1025 return callmethod('__delattr__', (key,))
1026
1027
1028class ValueProxy(BaseProxy):
1029 _exposed_ = ('get', 'set')
1030 def get(self):
1031 return self._callmethod('get')
1032 def set(self, value):
1033 return self._callmethod('set', (value,))
1034 value = property(get, set)
1035
1036
1037BaseListProxy = MakeProxyType('BaseListProxy', (
1038 '__add__', '__contains__', '__delitem__', '__delslice__',
1039 '__getitem__', '__getslice__', '__len__', '__mul__',
1040 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1041 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1042 'reverse', 'sort', '__imul__'
1043 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1044class ListProxy(BaseListProxy):
1045 def __iadd__(self, value):
1046 self._callmethod('extend', (value,))
1047 return self
1048 def __imul__(self, value):
1049 self._callmethod('__imul__', (value,))
1050 return self
1051
1052
1053DictProxy = MakeProxyType('DictProxy', (
1054 '__contains__', '__delitem__', '__getitem__', '__len__',
1055 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1056 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1057 ))
1058
1059
1060ArrayProxy = MakeProxyType('ArrayProxy', (
1061 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1062 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1063
1064
1065PoolProxy = MakeProxyType('PoolProxy', (
1066 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Antoine Pitroude911b22011-12-21 11:03:24 +01001067 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
Benjamin Petersone711caf2008-06-11 16:44:04 +00001068 ))
1069PoolProxy._method_to_typeid_ = {
1070 'apply_async': 'AsyncResult',
1071 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001072 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001073 'imap': 'Iterator',
1074 'imap_unordered': 'Iterator'
1075 }
1076
1077#
1078# Definition of SyncManager
1079#
1080
1081class SyncManager(BaseManager):
1082 '''
1083 Subclass of `BaseManager` which supports a number of shared object types.
1084
1085 The types registered are those intended for the synchronization
1086 of threads, plus `dict`, `list` and `Namespace`.
1087
1088 The `multiprocessing.Manager()` function creates started instances of
1089 this class.
1090 '''
1091
1092SyncManager.register('Queue', queue.Queue)
1093SyncManager.register('JoinableQueue', queue.Queue)
1094SyncManager.register('Event', threading.Event, EventProxy)
1095SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1096SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1097SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1098SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1099 AcquirerProxy)
1100SyncManager.register('Condition', threading.Condition, ConditionProxy)
1101SyncManager.register('Pool', Pool, PoolProxy)
1102SyncManager.register('list', list, ListProxy)
1103SyncManager.register('dict', dict, DictProxy)
1104SyncManager.register('Value', Value, ValueProxy)
1105SyncManager.register('Array', Array, ArrayProxy)
1106SyncManager.register('Namespace', Namespace, NamespaceProxy)
1107
1108# types returned by methods of PoolProxy
1109SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1110SyncManager.register('AsyncResult', create_method=False)