blob: 705909540005bec38847bee5d3282284af8a8ea3 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
22from traceback import format_exc
23from multiprocessing import Process, current_process, active_children, Pool, util, connection
24from multiprocessing.process import AuthenticationString
Florent Xicluna04842a82011-11-11 20:05:50 +010025from multiprocessing.forking import exit, Popen, ForkingPickler
Charles-François Natalic8ce7152012-04-17 18:45:57 +020026from time import time as _time
Benjamin Petersone711caf2008-06-11 16:44:04 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028#
Benjamin Petersone711caf2008-06-11 16:44:04 +000029# Register some things for pickling
30#
31
32def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000033 return array.array, (a.typecode, a.tobytes())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000034ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000035
36view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000037if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000038 def rebuild_as_list(obj):
39 return list, (list(obj),)
40 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000041 ForkingPickler.register(view_type, rebuild_as_list)
Amaury Forgeot d'Arcd757e732008-08-20 08:58:40 +000042 import copyreg
43 copyreg.pickle(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000044
45#
46# Type for identifying shared objects
47#
48
49class Token(object):
50 '''
51 Type to uniquely indentify a shared object
52 '''
53 __slots__ = ('typeid', 'address', 'id')
54
55 def __init__(self, typeid, address, id):
56 (self.typeid, self.address, self.id) = (typeid, address, id)
57
58 def __getstate__(self):
59 return (self.typeid, self.address, self.id)
60
61 def __setstate__(self, state):
62 (self.typeid, self.address, self.id) = state
63
64 def __repr__(self):
65 return 'Token(typeid=%r, address=%r, id=%r)' % \
66 (self.typeid, self.address, self.id)
67
68#
69# Function for communication with a manager's server process
70#
71
72def dispatch(c, id, methodname, args=(), kwds={}):
73 '''
74 Send a message to manager using connection `c` and return response
75 '''
76 c.send((id, methodname, args, kwds))
77 kind, result = c.recv()
78 if kind == '#RETURN':
79 return result
80 raise convert_to_error(kind, result)
81
82def convert_to_error(kind, result):
83 if kind == '#ERROR':
84 return result
85 elif kind == '#TRACEBACK':
86 assert type(result) is str
87 return RemoteError(result)
88 elif kind == '#UNSERIALIZABLE':
89 assert type(result) is str
90 return RemoteError('Unserializable message: %s\n' % result)
91 else:
92 return ValueError('Unrecognized message type')
93
94class RemoteError(Exception):
95 def __str__(self):
96 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
97
98#
99# Functions for finding the method names of an object
100#
101
102def all_methods(obj):
103 '''
104 Return a list of names of methods of `obj`
105 '''
106 temp = []
107 for name in dir(obj):
108 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200109 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000110 temp.append(name)
111 return temp
112
113def public_methods(obj):
114 '''
115 Return a list of names of methods of `obj` which do not start with '_'
116 '''
117 return [name for name in all_methods(obj) if name[0] != '_']
118
119#
120# Server which is run in a process controlled by a manager
121#
122
123class Server(object):
124 '''
125 Server class which runs in a process controlled by a manager object
126 '''
127 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
128 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
129
130 def __init__(self, registry, address, authkey, serializer):
131 assert isinstance(authkey, bytes)
132 self.registry = registry
133 self.authkey = AuthenticationString(authkey)
134 Listener, Client = listener_client[serializer]
135
136 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100137 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000138 self.address = self.listener.address
139
Jesse Noller63b3a972009-01-21 02:15:48 +0000140 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000141 self.id_to_refcount = {}
142 self.mutex = threading.RLock()
143 self.stop = 0
144
145 def serve_forever(self):
146 '''
147 Run the server forever
148 '''
149 current_process()._manager_server = self
150 try:
151 try:
152 while 1:
153 try:
154 c = self.listener.accept()
155 except (OSError, IOError):
156 continue
157 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000158 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 t.start()
160 except (KeyboardInterrupt, SystemExit):
161 pass
162 finally:
163 self.stop = 999
164 self.listener.close()
165
166 def handle_request(self, c):
167 '''
168 Handle a new connection
169 '''
170 funcname = result = request = None
171 try:
172 connection.deliver_challenge(c, self.authkey)
173 connection.answer_challenge(c, self.authkey)
174 request = c.recv()
175 ignore, funcname, args, kwds = request
176 assert funcname in self.public, '%r unrecognized' % funcname
177 func = getattr(self, funcname)
178 except Exception:
179 msg = ('#TRACEBACK', format_exc())
180 else:
181 try:
182 result = func(c, *args, **kwds)
183 except Exception:
184 msg = ('#TRACEBACK', format_exc())
185 else:
186 msg = ('#RETURN', result)
187 try:
188 c.send(msg)
189 except Exception as e:
190 try:
191 c.send(('#TRACEBACK', format_exc()))
192 except Exception:
193 pass
194 util.info('Failure to send message: %r', msg)
195 util.info(' ... request was %r', request)
196 util.info(' ... exception was %r', e)
197
198 c.close()
199
200 def serve_client(self, conn):
201 '''
202 Handle requests from the proxies in a particular process/thread
203 '''
204 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000205 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
207 recv = conn.recv
208 send = conn.send
209 id_to_obj = self.id_to_obj
210
211 while not self.stop:
212
213 try:
214 methodname = obj = None
215 request = recv()
216 ident, methodname, args, kwds = request
217 obj, exposed, gettypeid = id_to_obj[ident]
218
219 if methodname not in exposed:
220 raise AttributeError(
221 'method %r of %r object is not in exposed=%r' %
222 (methodname, type(obj), exposed)
223 )
224
225 function = getattr(obj, methodname)
226
227 try:
228 res = function(*args, **kwds)
229 except Exception as e:
230 msg = ('#ERROR', e)
231 else:
232 typeid = gettypeid and gettypeid.get(methodname, None)
233 if typeid:
234 rident, rexposed = self.create(conn, typeid, res)
235 token = Token(typeid, self.address, rident)
236 msg = ('#PROXY', (rexposed, token))
237 else:
238 msg = ('#RETURN', res)
239
240 except AttributeError:
241 if methodname is None:
242 msg = ('#TRACEBACK', format_exc())
243 else:
244 try:
245 fallback_func = self.fallback_mapping[methodname]
246 result = fallback_func(
247 self, conn, ident, obj, *args, **kwds
248 )
249 msg = ('#RETURN', result)
250 except Exception:
251 msg = ('#TRACEBACK', format_exc())
252
253 except EOFError:
254 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000255 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256 sys.exit(0)
257
258 except Exception:
259 msg = ('#TRACEBACK', format_exc())
260
261 try:
262 try:
263 send(msg)
264 except Exception as e:
265 send(('#UNSERIALIZABLE', repr(msg)))
266 except Exception as e:
267 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000268 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269 util.info(' ... message was %r', msg)
270 util.info(' ... exception was %r', e)
271 conn.close()
272 sys.exit(1)
273
274 def fallback_getvalue(self, conn, ident, obj):
275 return obj
276
277 def fallback_str(self, conn, ident, obj):
278 return str(obj)
279
280 def fallback_repr(self, conn, ident, obj):
281 return repr(obj)
282
283 fallback_mapping = {
284 '__str__':fallback_str,
285 '__repr__':fallback_repr,
286 '#GETVALUE':fallback_getvalue
287 }
288
289 def dummy(self, c):
290 pass
291
292 def debug_info(self, c):
293 '''
294 Return some info --- useful to spot problems with refcounting
295 '''
296 self.mutex.acquire()
297 try:
298 result = []
299 keys = list(self.id_to_obj.keys())
300 keys.sort()
301 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000302 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000303 result.append(' %s: refcount=%s\n %s' %
304 (ident, self.id_to_refcount[ident],
305 str(self.id_to_obj[ident][0])[:75]))
306 return '\n'.join(result)
307 finally:
308 self.mutex.release()
309
310 def number_of_objects(self, c):
311 '''
312 Number of shared objects
313 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000314 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000315
316 def shutdown(self, c):
317 '''
318 Shutdown this process
319 '''
320 try:
321 try:
322 util.debug('manager received shutdown message')
323 c.send(('#RETURN', None))
324
325 if sys.stdout != sys.__stdout__:
326 util.debug('resetting stdout, stderr')
327 sys.stdout = sys.__stdout__
328 sys.stderr = sys.__stderr__
329
330 util._run_finalizers(0)
331
332 for p in active_children():
333 util.debug('terminating a child process of manager')
334 p.terminate()
335
336 for p in active_children():
337 util.debug('terminating a child process of manager')
338 p.join()
339
340 util._run_finalizers()
341 util.info('manager exiting with exitcode 0')
342 except:
343 import traceback
344 traceback.print_exc()
345 finally:
346 exit(0)
347
348 def create(self, c, typeid, *args, **kwds):
349 '''
350 Create a new shared object and return its id
351 '''
352 self.mutex.acquire()
353 try:
354 callable, exposed, method_to_typeid, proxytype = \
355 self.registry[typeid]
356
357 if callable is None:
358 assert len(args) == 1 and not kwds
359 obj = args[0]
360 else:
361 obj = callable(*args, **kwds)
362
363 if exposed is None:
364 exposed = public_methods(obj)
365 if method_to_typeid is not None:
366 assert type(method_to_typeid) is dict
367 exposed = list(exposed) + list(method_to_typeid)
368
369 ident = '%x' % id(obj) # convert to string because xmlrpclib
370 # only has 32 bit signed integers
371 util.debug('%r callable returned object with id %r', typeid, ident)
372
373 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
374 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000375 self.id_to_refcount[ident] = 0
376 # increment the reference count immediately, to avoid
377 # this object being garbage collected before a Proxy
378 # object for it can be created. The caller of create()
379 # is responsible for doing a decref once the Proxy object
380 # has been created.
381 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000382 return ident, tuple(exposed)
383 finally:
384 self.mutex.release()
385
386 def get_methods(self, c, token):
387 '''
388 Return the methods of the shared object indicated by token
389 '''
390 return tuple(self.id_to_obj[token.id][1])
391
392 def accept_connection(self, c, name):
393 '''
394 Spawn a new thread to serve this connection
395 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000396 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397 c.send(('#RETURN', None))
398 self.serve_client(c)
399
400 def incref(self, c, ident):
401 self.mutex.acquire()
402 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000403 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000404 finally:
405 self.mutex.release()
406
407 def decref(self, c, ident):
408 self.mutex.acquire()
409 try:
410 assert self.id_to_refcount[ident] >= 1
411 self.id_to_refcount[ident] -= 1
412 if self.id_to_refcount[ident] == 0:
413 del self.id_to_obj[ident], self.id_to_refcount[ident]
Benjamin Peterson4ac9ce42009-10-04 14:49:41 +0000414 util.debug('disposing of obj with id %r', ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000415 finally:
416 self.mutex.release()
417
418#
419# Class to represent state of a manager
420#
421
422class State(object):
423 __slots__ = ['value']
424 INITIAL = 0
425 STARTED = 1
426 SHUTDOWN = 2
427
428#
429# Mapping from serializer name to Listener and Client types
430#
431
432listener_client = {
433 'pickle' : (connection.Listener, connection.Client),
434 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
435 }
436
437#
438# Definition of BaseManager
439#
440
441class BaseManager(object):
442 '''
443 Base class for managers
444 '''
445 _registry = {}
446 _Server = Server
447
448 def __init__(self, address=None, authkey=None, serializer='pickle'):
449 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000450 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000451 self._address = address # XXX not final address if eg ('', 0)
452 self._authkey = AuthenticationString(authkey)
453 self._state = State()
454 self._state.value = State.INITIAL
455 self._serializer = serializer
456 self._Listener, self._Client = listener_client[serializer]
457
Benjamin Petersone711caf2008-06-11 16:44:04 +0000458 def get_server(self):
459 '''
460 Return server object with serve_forever() method and address attribute
461 '''
462 assert self._state.value == State.INITIAL
463 return Server(self._registry, self._address,
464 self._authkey, self._serializer)
465
466 def connect(self):
467 '''
468 Connect manager object to the server process
469 '''
470 Listener, Client = listener_client[self._serializer]
471 conn = Client(self._address, authkey=self._authkey)
472 dispatch(conn, None, 'dummy')
473 self._state.value = State.STARTED
474
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000475 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000476 '''
477 Spawn a server process for this manager object
478 '''
479 assert self._state.value == State.INITIAL
480
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200481 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000482 raise TypeError('initializer must be a callable')
483
Benjamin Petersone711caf2008-06-11 16:44:04 +0000484 # pipe over which we will retrieve address of server
485 reader, writer = connection.Pipe(duplex=False)
486
487 # spawn process which runs a server
488 self._process = Process(
489 target=type(self)._run_server,
490 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000491 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000492 )
493 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000494 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000495 self._process.start()
496
497 # get address of server
498 writer.close()
499 self._address = reader.recv()
500 reader.close()
501
502 # register a finalizer
503 self._state.value = State.STARTED
504 self.shutdown = util.Finalize(
505 self, type(self)._finalize_manager,
506 args=(self._process, self._address, self._authkey,
507 self._state, self._Client),
508 exitpriority=0
509 )
510
511 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000512 def _run_server(cls, registry, address, authkey, serializer, writer,
513 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000514 '''
515 Create a server, report its address and run it
516 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000517 if initializer is not None:
518 initializer(*initargs)
519
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520 # create server
521 server = cls._Server(registry, address, authkey, serializer)
522
523 # inform parent process of the server's address
524 writer.send(server.address)
525 writer.close()
526
527 # run the manager
528 util.info('manager serving at %r', server.address)
529 server.serve_forever()
530
531 def _create(self, typeid, *args, **kwds):
532 '''
533 Create a new shared object; return the token and exposed tuple
534 '''
535 assert self._state.value == State.STARTED, 'server not yet started'
536 conn = self._Client(self._address, authkey=self._authkey)
537 try:
538 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
539 finally:
540 conn.close()
541 return Token(typeid, self._address, id), exposed
542
543 def join(self, timeout=None):
544 '''
545 Join the manager process (if it has been spawned)
546 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100547 if self._process is not None:
548 self._process.join(timeout)
549 if not self._process.is_alive():
550 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000551
552 def _debug_info(self):
553 '''
554 Return some info about the servers shared objects and connections
555 '''
556 conn = self._Client(self._address, authkey=self._authkey)
557 try:
558 return dispatch(conn, None, 'debug_info')
559 finally:
560 conn.close()
561
562 def _number_of_objects(self):
563 '''
564 Return the number of shared objects
565 '''
566 conn = self._Client(self._address, authkey=self._authkey)
567 try:
568 return dispatch(conn, None, 'number_of_objects')
569 finally:
570 conn.close()
571
572 def __enter__(self):
573 return self
574
575 def __exit__(self, exc_type, exc_val, exc_tb):
576 self.shutdown()
577
578 @staticmethod
579 def _finalize_manager(process, address, authkey, state, _Client):
580 '''
581 Shutdown the manager process; will be registered as a finalizer
582 '''
583 if process.is_alive():
584 util.info('sending shutdown message to manager')
585 try:
586 conn = _Client(address, authkey=authkey)
587 try:
588 dispatch(conn, None, 'shutdown')
589 finally:
590 conn.close()
591 except Exception:
592 pass
593
594 process.join(timeout=0.2)
595 if process.is_alive():
596 util.info('manager still alive')
597 if hasattr(process, 'terminate'):
598 util.info('trying to `terminate()` manager process')
599 process.terminate()
600 process.join(timeout=0.1)
601 if process.is_alive():
602 util.info('manager still alive after terminate')
603
604 state.value = State.SHUTDOWN
605 try:
606 del BaseProxy._address_to_local[address]
607 except KeyError:
608 pass
609
610 address = property(lambda self: self._address)
611
612 @classmethod
613 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
614 method_to_typeid=None, create_method=True):
615 '''
616 Register a typeid with the manager type
617 '''
618 if '_registry' not in cls.__dict__:
619 cls._registry = cls._registry.copy()
620
621 if proxytype is None:
622 proxytype = AutoProxy
623
624 exposed = exposed or getattr(proxytype, '_exposed_', None)
625
626 method_to_typeid = method_to_typeid or \
627 getattr(proxytype, '_method_to_typeid_', None)
628
629 if method_to_typeid:
630 for key, value in list(method_to_typeid.items()):
631 assert type(key) is str, '%r is not a string' % key
632 assert type(value) is str, '%r is not a string' % value
633
634 cls._registry[typeid] = (
635 callable, exposed, method_to_typeid, proxytype
636 )
637
638 if create_method:
639 def temp(self, *args, **kwds):
640 util.debug('requesting creation of a shared %r object', typeid)
641 token, exp = self._create(typeid, *args, **kwds)
642 proxy = proxytype(
643 token, self._serializer, manager=self,
644 authkey=self._authkey, exposed=exp
645 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000646 conn = self._Client(token.address, authkey=self._authkey)
647 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000648 return proxy
649 temp.__name__ = typeid
650 setattr(cls, typeid, temp)
651
652#
653# Subclass of set which get cleared after a fork
654#
655
656class ProcessLocalSet(set):
657 def __init__(self):
658 util.register_after_fork(self, lambda obj: obj.clear())
659 def __reduce__(self):
660 return type(self), ()
661
662#
663# Definition of BaseProxy
664#
665
666class BaseProxy(object):
667 '''
668 A base for proxies of shared objects
669 '''
670 _address_to_local = {}
671 _mutex = util.ForkAwareThreadLock()
672
673 def __init__(self, token, serializer, manager=None,
674 authkey=None, exposed=None, incref=True):
675 BaseProxy._mutex.acquire()
676 try:
677 tls_idset = BaseProxy._address_to_local.get(token.address, None)
678 if tls_idset is None:
679 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
680 BaseProxy._address_to_local[token.address] = tls_idset
681 finally:
682 BaseProxy._mutex.release()
683
684 # self._tls is used to record the connection used by this
685 # thread to communicate with the manager at token.address
686 self._tls = tls_idset[0]
687
688 # self._idset is used to record the identities of all shared
689 # objects for which the current process owns references and
690 # which are in the manager at token.address
691 self._idset = tls_idset[1]
692
693 self._token = token
694 self._id = self._token.id
695 self._manager = manager
696 self._serializer = serializer
697 self._Client = listener_client[serializer][1]
698
699 if authkey is not None:
700 self._authkey = AuthenticationString(authkey)
701 elif self._manager is not None:
702 self._authkey = self._manager._authkey
703 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000704 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000705
706 if incref:
707 self._incref()
708
709 util.register_after_fork(self, BaseProxy._after_fork)
710
711 def _connect(self):
712 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000713 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000714 if threading.current_thread().name != 'MainThread':
715 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000716 conn = self._Client(self._token.address, authkey=self._authkey)
717 dispatch(conn, None, 'accept_connection', (name,))
718 self._tls.connection = conn
719
720 def _callmethod(self, methodname, args=(), kwds={}):
721 '''
722 Try to call a method of the referrent and return a copy of the result
723 '''
724 try:
725 conn = self._tls.connection
726 except AttributeError:
727 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000728 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000729 self._connect()
730 conn = self._tls.connection
731
732 conn.send((self._id, methodname, args, kwds))
733 kind, result = conn.recv()
734
735 if kind == '#RETURN':
736 return result
737 elif kind == '#PROXY':
738 exposed, token = result
739 proxytype = self._manager._registry[token.typeid][-1]
Jesse Noller824f4f32008-09-02 19:12:20 +0000740 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000741 token, self._serializer, manager=self._manager,
742 authkey=self._authkey, exposed=exposed
743 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000744 conn = self._Client(token.address, authkey=self._authkey)
745 dispatch(conn, None, 'decref', (token.id,))
746 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000747 raise convert_to_error(kind, result)
748
749 def _getvalue(self):
750 '''
751 Get a copy of the value of the referent
752 '''
753 return self._callmethod('#GETVALUE')
754
755 def _incref(self):
756 conn = self._Client(self._token.address, authkey=self._authkey)
757 dispatch(conn, None, 'incref', (self._id,))
758 util.debug('INCREF %r', self._token.id)
759
760 self._idset.add(self._id)
761
762 state = self._manager and self._manager._state
763
764 self._close = util.Finalize(
765 self, BaseProxy._decref,
766 args=(self._token, self._authkey, state,
767 self._tls, self._idset, self._Client),
768 exitpriority=10
769 )
770
771 @staticmethod
772 def _decref(token, authkey, state, tls, idset, _Client):
773 idset.discard(token.id)
774
775 # check whether manager is still alive
776 if state is None or state.value == State.STARTED:
777 # tell manager this process no longer cares about referent
778 try:
779 util.debug('DECREF %r', token.id)
780 conn = _Client(token.address, authkey=authkey)
781 dispatch(conn, None, 'decref', (token.id,))
782 except Exception as e:
783 util.debug('... decref failed %s', e)
784
785 else:
786 util.debug('DECREF %r -- manager already shutdown', token.id)
787
788 # check whether we can close this thread's connection because
789 # the process owns no more references to objects for this manager
790 if not idset and hasattr(tls, 'connection'):
791 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000792 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 tls.connection.close()
794 del tls.connection
795
796 def _after_fork(self):
797 self._manager = None
798 try:
799 self._incref()
800 except Exception as e:
801 # the proxy may just be for a manager which has shutdown
802 util.info('incref failed: %s' % e)
803
804 def __reduce__(self):
805 kwds = {}
806 if Popen.thread_is_spawning():
807 kwds['authkey'] = self._authkey
808
809 if getattr(self, '_isauto', False):
810 kwds['exposed'] = self._exposed_
811 return (RebuildProxy,
812 (AutoProxy, self._token, self._serializer, kwds))
813 else:
814 return (RebuildProxy,
815 (type(self), self._token, self._serializer, kwds))
816
817 def __deepcopy__(self, memo):
818 return self._getvalue()
819
820 def __repr__(self):
821 return '<%s object, typeid %r at %s>' % \
822 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
823
824 def __str__(self):
825 '''
826 Return representation of the referent (or a fall-back if that fails)
827 '''
828 try:
829 return self._callmethod('__repr__')
830 except Exception:
831 return repr(self)[:-1] + "; '__str__()' failed>"
832
833#
834# Function used for unpickling
835#
836
837def RebuildProxy(func, token, serializer, kwds):
838 '''
839 Function used for unpickling proxy objects.
840
841 If possible the shared object is returned, or otherwise a proxy for it.
842 '''
843 server = getattr(current_process(), '_manager_server', None)
844
845 if server and server.address == token.address:
846 return server.id_to_obj[token.id][0]
847 else:
848 incref = (
849 kwds.pop('incref', True) and
850 not getattr(current_process(), '_inheriting', False)
851 )
852 return func(token, serializer, incref=incref, **kwds)
853
854#
855# Functions to create proxies and proxy types
856#
857
858def MakeProxyType(name, exposed, _cache={}):
859 '''
860 Return an proxy type whose methods are given by `exposed`
861 '''
862 exposed = tuple(exposed)
863 try:
864 return _cache[(name, exposed)]
865 except KeyError:
866 pass
867
868 dic = {}
869
870 for meth in exposed:
871 exec('''def %s(self, *args, **kwds):
872 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
873
874 ProxyType = type(name, (BaseProxy,), dic)
875 ProxyType._exposed_ = exposed
876 _cache[(name, exposed)] = ProxyType
877 return ProxyType
878
879
880def AutoProxy(token, serializer, manager=None, authkey=None,
881 exposed=None, incref=True):
882 '''
883 Return an auto-proxy for `token`
884 '''
885 _Client = listener_client[serializer][1]
886
887 if exposed is None:
888 conn = _Client(token.address, authkey=authkey)
889 try:
890 exposed = dispatch(conn, None, 'get_methods', (token,))
891 finally:
892 conn.close()
893
894 if authkey is None and manager is not None:
895 authkey = manager._authkey
896 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000897 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898
899 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
900 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
901 incref=incref)
902 proxy._isauto = True
903 return proxy
904
905#
906# Types/callables which we will register with SyncManager
907#
908
909class Namespace(object):
910 def __init__(self, **kwds):
911 self.__dict__.update(kwds)
912 def __repr__(self):
913 items = list(self.__dict__.items())
914 temp = []
915 for name, value in items:
916 if not name.startswith('_'):
917 temp.append('%s=%r' % (name, value))
918 temp.sort()
919 return 'Namespace(%s)' % str.join(', ', temp)
920
921class Value(object):
922 def __init__(self, typecode, value, lock=True):
923 self._typecode = typecode
924 self._value = value
925 def get(self):
926 return self._value
927 def set(self, value):
928 self._value = value
929 def __repr__(self):
930 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
931 value = property(get, set)
932
933def Array(typecode, sequence, lock=True):
934 return array.array(typecode, sequence)
935
936#
937# Proxy types used by SyncManager
938#
939
940class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000941 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000942 def __iter__(self):
943 return self
944 def __next__(self, *args):
945 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946 def send(self, *args):
947 return self._callmethod('send', args)
948 def throw(self, *args):
949 return self._callmethod('throw', args)
950 def close(self, *args):
951 return self._callmethod('close', args)
952
953
954class AcquirerProxy(BaseProxy):
955 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100956 def acquire(self, blocking=True, timeout=None):
957 args = (blocking,) if timeout is None else (blocking, timeout)
958 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000959 def release(self):
960 return self._callmethod('release')
961 def __enter__(self):
962 return self._callmethod('acquire')
963 def __exit__(self, exc_type, exc_val, exc_tb):
964 return self._callmethod('release')
965
966
967class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000968 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000969 def wait(self, timeout=None):
970 return self._callmethod('wait', (timeout,))
971 def notify(self):
972 return self._callmethod('notify')
973 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000974 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200975 def wait_for(self, predicate, timeout=None):
976 result = predicate()
977 if result:
978 return result
979 if timeout is not None:
980 endtime = _time() + timeout
981 else:
982 endtime = None
983 waittime = None
984 while not result:
985 if endtime is not None:
986 waittime = endtime - _time()
987 if waittime <= 0:
988 break
989 self.wait(waittime)
990 result = predicate()
991 return result
992
Benjamin Petersone711caf2008-06-11 16:44:04 +0000993
994class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000995 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000996 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000997 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000998 def set(self):
999 return self._callmethod('set')
1000 def clear(self):
1001 return self._callmethod('clear')
1002 def wait(self, timeout=None):
1003 return self._callmethod('wait', (timeout,))
1004
1005class NamespaceProxy(BaseProxy):
1006 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1007 def __getattr__(self, key):
1008 if key[0] == '_':
1009 return object.__getattribute__(self, key)
1010 callmethod = object.__getattribute__(self, '_callmethod')
1011 return callmethod('__getattribute__', (key,))
1012 def __setattr__(self, key, value):
1013 if key[0] == '_':
1014 return object.__setattr__(self, key, value)
1015 callmethod = object.__getattribute__(self, '_callmethod')
1016 return callmethod('__setattr__', (key, value))
1017 def __delattr__(self, key):
1018 if key[0] == '_':
1019 return object.__delattr__(self, key)
1020 callmethod = object.__getattribute__(self, '_callmethod')
1021 return callmethod('__delattr__', (key,))
1022
1023
1024class ValueProxy(BaseProxy):
1025 _exposed_ = ('get', 'set')
1026 def get(self):
1027 return self._callmethod('get')
1028 def set(self, value):
1029 return self._callmethod('set', (value,))
1030 value = property(get, set)
1031
1032
1033BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001034 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1035 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001036 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1037 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001038 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039class ListProxy(BaseListProxy):
1040 def __iadd__(self, value):
1041 self._callmethod('extend', (value,))
1042 return self
1043 def __imul__(self, value):
1044 self._callmethod('__imul__', (value,))
1045 return self
1046
1047
1048DictProxy = MakeProxyType('DictProxy', (
1049 '__contains__', '__delitem__', '__getitem__', '__len__',
1050 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1051 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1052 ))
1053
1054
1055ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001056 '__len__', '__getitem__', '__setitem__'
1057 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058
1059
1060PoolProxy = MakeProxyType('PoolProxy', (
1061 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Antoine Pitroude911b22011-12-21 11:03:24 +01001062 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
Benjamin Petersone711caf2008-06-11 16:44:04 +00001063 ))
1064PoolProxy._method_to_typeid_ = {
1065 'apply_async': 'AsyncResult',
1066 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001067 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001068 'imap': 'Iterator',
1069 'imap_unordered': 'Iterator'
1070 }
1071
1072#
1073# Definition of SyncManager
1074#
1075
1076class SyncManager(BaseManager):
1077 '''
1078 Subclass of `BaseManager` which supports a number of shared object types.
1079
1080 The types registered are those intended for the synchronization
1081 of threads, plus `dict`, `list` and `Namespace`.
1082
1083 The `multiprocessing.Manager()` function creates started instances of
1084 this class.
1085 '''
1086
1087SyncManager.register('Queue', queue.Queue)
1088SyncManager.register('JoinableQueue', queue.Queue)
1089SyncManager.register('Event', threading.Event, EventProxy)
1090SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1091SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1092SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1093SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1094 AcquirerProxy)
1095SyncManager.register('Condition', threading.Condition, ConditionProxy)
1096SyncManager.register('Pool', Pool, PoolProxy)
1097SyncManager.register('list', list, ListProxy)
1098SyncManager.register('dict', dict, DictProxy)
1099SyncManager.register('Value', Value, ValueProxy)
1100SyncManager.register('Array', Array, ArrayProxy)
1101SyncManager.register('Namespace', Namespace, NamespaceProxy)
1102
1103# types returned by methods of PoolProxy
1104SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1105SyncManager.register('AsyncResult', create_method=False)