blob: f580e9ec1e0e8241b966b54ec61312735c776434 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
Charles-François Natalic8ce7152012-04-17 18:45:57 +020022from time import time as _time
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010023from traceback import format_exc
24
25from . import connection
26from . import pool
27from . import process
28from . import popen
29from . import reduction
30from . import util
Benjamin Petersone711caf2008-06-11 16:44:04 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032#
Benjamin Petersone711caf2008-06-11 16:44:04 +000033# Register some things for pickling
34#
35
36def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000037 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010038reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
40view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000041if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000042 def rebuild_as_list(obj):
43 return list, (list(obj),)
44 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010045 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
47#
48# Type for identifying shared objects
49#
50
51class Token(object):
52 '''
53 Type to uniquely indentify a shared object
54 '''
55 __slots__ = ('typeid', 'address', 'id')
56
57 def __init__(self, typeid, address, id):
58 (self.typeid, self.address, self.id) = (typeid, address, id)
59
60 def __getstate__(self):
61 return (self.typeid, self.address, self.id)
62
63 def __setstate__(self, state):
64 (self.typeid, self.address, self.id) = state
65
66 def __repr__(self):
67 return 'Token(typeid=%r, address=%r, id=%r)' % \
68 (self.typeid, self.address, self.id)
69
70#
71# Function for communication with a manager's server process
72#
73
74def dispatch(c, id, methodname, args=(), kwds={}):
75 '''
76 Send a message to manager using connection `c` and return response
77 '''
78 c.send((id, methodname, args, kwds))
79 kind, result = c.recv()
80 if kind == '#RETURN':
81 return result
82 raise convert_to_error(kind, result)
83
84def convert_to_error(kind, result):
85 if kind == '#ERROR':
86 return result
87 elif kind == '#TRACEBACK':
88 assert type(result) is str
89 return RemoteError(result)
90 elif kind == '#UNSERIALIZABLE':
91 assert type(result) is str
92 return RemoteError('Unserializable message: %s\n' % result)
93 else:
94 return ValueError('Unrecognized message type')
95
96class RemoteError(Exception):
97 def __str__(self):
98 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
99
100#
101# Functions for finding the method names of an object
102#
103
104def all_methods(obj):
105 '''
106 Return a list of names of methods of `obj`
107 '''
108 temp = []
109 for name in dir(obj):
110 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200111 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000112 temp.append(name)
113 return temp
114
115def public_methods(obj):
116 '''
117 Return a list of names of methods of `obj` which do not start with '_'
118 '''
119 return [name for name in all_methods(obj) if name[0] != '_']
120
121#
122# Server which is run in a process controlled by a manager
123#
124
125class Server(object):
126 '''
127 Server class which runs in a process controlled by a manager object
128 '''
129 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
130 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
131
132 def __init__(self, registry, address, authkey, serializer):
133 assert isinstance(authkey, bytes)
134 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100135 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136 Listener, Client = listener_client[serializer]
137
138 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100139 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000140 self.address = self.listener.address
141
Jesse Noller63b3a972009-01-21 02:15:48 +0000142 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000143 self.id_to_refcount = {}
144 self.mutex = threading.RLock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000145
146 def serve_forever(self):
147 '''
148 Run the server forever
149 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100150 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100151 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100153 accepter = threading.Thread(target=self.accepter)
154 accepter.daemon = True
155 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100157 while not self.stop_event.is_set():
158 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 except (KeyboardInterrupt, SystemExit):
160 pass
161 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100162 if sys.stdout != sys.__stdout__:
163 util.debug('resetting stdout, stderr')
164 sys.stdout = sys.__stdout__
165 sys.stderr = sys.__stderr__
166 sys.exit(0)
167
168 def accepter(self):
169 while True:
170 try:
171 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200172 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100173 continue
174 t = threading.Thread(target=self.handle_request, args=(c,))
175 t.daemon = True
176 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000177
178 def handle_request(self, c):
179 '''
180 Handle a new connection
181 '''
182 funcname = result = request = None
183 try:
184 connection.deliver_challenge(c, self.authkey)
185 connection.answer_challenge(c, self.authkey)
186 request = c.recv()
187 ignore, funcname, args, kwds = request
188 assert funcname in self.public, '%r unrecognized' % funcname
189 func = getattr(self, funcname)
190 except Exception:
191 msg = ('#TRACEBACK', format_exc())
192 else:
193 try:
194 result = func(c, *args, **kwds)
195 except Exception:
196 msg = ('#TRACEBACK', format_exc())
197 else:
198 msg = ('#RETURN', result)
199 try:
200 c.send(msg)
201 except Exception as e:
202 try:
203 c.send(('#TRACEBACK', format_exc()))
204 except Exception:
205 pass
206 util.info('Failure to send message: %r', msg)
207 util.info(' ... request was %r', request)
208 util.info(' ... exception was %r', e)
209
210 c.close()
211
212 def serve_client(self, conn):
213 '''
214 Handle requests from the proxies in a particular process/thread
215 '''
216 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000217 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218
219 recv = conn.recv
220 send = conn.send
221 id_to_obj = self.id_to_obj
222
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100223 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000224
225 try:
226 methodname = obj = None
227 request = recv()
228 ident, methodname, args, kwds = request
229 obj, exposed, gettypeid = id_to_obj[ident]
230
231 if methodname not in exposed:
232 raise AttributeError(
233 'method %r of %r object is not in exposed=%r' %
234 (methodname, type(obj), exposed)
235 )
236
237 function = getattr(obj, methodname)
238
239 try:
240 res = function(*args, **kwds)
241 except Exception as e:
242 msg = ('#ERROR', e)
243 else:
244 typeid = gettypeid and gettypeid.get(methodname, None)
245 if typeid:
246 rident, rexposed = self.create(conn, typeid, res)
247 token = Token(typeid, self.address, rident)
248 msg = ('#PROXY', (rexposed, token))
249 else:
250 msg = ('#RETURN', res)
251
252 except AttributeError:
253 if methodname is None:
254 msg = ('#TRACEBACK', format_exc())
255 else:
256 try:
257 fallback_func = self.fallback_mapping[methodname]
258 result = fallback_func(
259 self, conn, ident, obj, *args, **kwds
260 )
261 msg = ('#RETURN', result)
262 except Exception:
263 msg = ('#TRACEBACK', format_exc())
264
265 except EOFError:
266 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000267 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 sys.exit(0)
269
270 except Exception:
271 msg = ('#TRACEBACK', format_exc())
272
273 try:
274 try:
275 send(msg)
276 except Exception as e:
277 send(('#UNSERIALIZABLE', repr(msg)))
278 except Exception as e:
279 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000280 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000281 util.info(' ... message was %r', msg)
282 util.info(' ... exception was %r', e)
283 conn.close()
284 sys.exit(1)
285
286 def fallback_getvalue(self, conn, ident, obj):
287 return obj
288
289 def fallback_str(self, conn, ident, obj):
290 return str(obj)
291
292 def fallback_repr(self, conn, ident, obj):
293 return repr(obj)
294
295 fallback_mapping = {
296 '__str__':fallback_str,
297 '__repr__':fallback_repr,
298 '#GETVALUE':fallback_getvalue
299 }
300
301 def dummy(self, c):
302 pass
303
304 def debug_info(self, c):
305 '''
306 Return some info --- useful to spot problems with refcounting
307 '''
308 self.mutex.acquire()
309 try:
310 result = []
311 keys = list(self.id_to_obj.keys())
312 keys.sort()
313 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000314 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000315 result.append(' %s: refcount=%s\n %s' %
316 (ident, self.id_to_refcount[ident],
317 str(self.id_to_obj[ident][0])[:75]))
318 return '\n'.join(result)
319 finally:
320 self.mutex.release()
321
322 def number_of_objects(self, c):
323 '''
324 Number of shared objects
325 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000326 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
328 def shutdown(self, c):
329 '''
330 Shutdown this process
331 '''
332 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100333 util.debug('manager received shutdown message')
334 c.send(('#RETURN', None))
335 except:
336 import traceback
337 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000338 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100339 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000340
341 def create(self, c, typeid, *args, **kwds):
342 '''
343 Create a new shared object and return its id
344 '''
345 self.mutex.acquire()
346 try:
347 callable, exposed, method_to_typeid, proxytype = \
348 self.registry[typeid]
349
350 if callable is None:
351 assert len(args) == 1 and not kwds
352 obj = args[0]
353 else:
354 obj = callable(*args, **kwds)
355
356 if exposed is None:
357 exposed = public_methods(obj)
358 if method_to_typeid is not None:
359 assert type(method_to_typeid) is dict
360 exposed = list(exposed) + list(method_to_typeid)
361
362 ident = '%x' % id(obj) # convert to string because xmlrpclib
363 # only has 32 bit signed integers
364 util.debug('%r callable returned object with id %r', typeid, ident)
365
366 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
367 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000368 self.id_to_refcount[ident] = 0
369 # increment the reference count immediately, to avoid
370 # this object being garbage collected before a Proxy
371 # object for it can be created. The caller of create()
372 # is responsible for doing a decref once the Proxy object
373 # has been created.
374 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000375 return ident, tuple(exposed)
376 finally:
377 self.mutex.release()
378
379 def get_methods(self, c, token):
380 '''
381 Return the methods of the shared object indicated by token
382 '''
383 return tuple(self.id_to_obj[token.id][1])
384
385 def accept_connection(self, c, name):
386 '''
387 Spawn a new thread to serve this connection
388 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000389 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000390 c.send(('#RETURN', None))
391 self.serve_client(c)
392
393 def incref(self, c, ident):
394 self.mutex.acquire()
395 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000396 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397 finally:
398 self.mutex.release()
399
400 def decref(self, c, ident):
401 self.mutex.acquire()
402 try:
403 assert self.id_to_refcount[ident] >= 1
404 self.id_to_refcount[ident] -= 1
405 if self.id_to_refcount[ident] == 0:
406 del self.id_to_obj[ident], self.id_to_refcount[ident]
Benjamin Peterson4ac9ce42009-10-04 14:49:41 +0000407 util.debug('disposing of obj with id %r', ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000408 finally:
409 self.mutex.release()
410
411#
412# Class to represent state of a manager
413#
414
415class State(object):
416 __slots__ = ['value']
417 INITIAL = 0
418 STARTED = 1
419 SHUTDOWN = 2
420
421#
422# Mapping from serializer name to Listener and Client types
423#
424
425listener_client = {
426 'pickle' : (connection.Listener, connection.Client),
427 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
428 }
429
430#
431# Definition of BaseManager
432#
433
434class BaseManager(object):
435 '''
436 Base class for managers
437 '''
438 _registry = {}
439 _Server = Server
440
441 def __init__(self, address=None, authkey=None, serializer='pickle'):
442 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100443 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000444 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100445 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000446 self._state = State()
447 self._state.value = State.INITIAL
448 self._serializer = serializer
449 self._Listener, self._Client = listener_client[serializer]
450
Benjamin Petersone711caf2008-06-11 16:44:04 +0000451 def get_server(self):
452 '''
453 Return server object with serve_forever() method and address attribute
454 '''
455 assert self._state.value == State.INITIAL
456 return Server(self._registry, self._address,
457 self._authkey, self._serializer)
458
459 def connect(self):
460 '''
461 Connect manager object to the server process
462 '''
463 Listener, Client = listener_client[self._serializer]
464 conn = Client(self._address, authkey=self._authkey)
465 dispatch(conn, None, 'dummy')
466 self._state.value = State.STARTED
467
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000468 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000469 '''
470 Spawn a server process for this manager object
471 '''
472 assert self._state.value == State.INITIAL
473
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200474 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000475 raise TypeError('initializer must be a callable')
476
Benjamin Petersone711caf2008-06-11 16:44:04 +0000477 # pipe over which we will retrieve address of server
478 reader, writer = connection.Pipe(duplex=False)
479
480 # spawn process which runs a server
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100481 self._process = process.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000482 target=type(self)._run_server,
483 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000484 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000485 )
486 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000487 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000488 self._process.start()
489
490 # get address of server
491 writer.close()
492 self._address = reader.recv()
493 reader.close()
494
495 # register a finalizer
496 self._state.value = State.STARTED
497 self.shutdown = util.Finalize(
498 self, type(self)._finalize_manager,
499 args=(self._process, self._address, self._authkey,
500 self._state, self._Client),
501 exitpriority=0
502 )
503
504 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000505 def _run_server(cls, registry, address, authkey, serializer, writer,
506 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000507 '''
508 Create a server, report its address and run it
509 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000510 if initializer is not None:
511 initializer(*initargs)
512
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 # create server
514 server = cls._Server(registry, address, authkey, serializer)
515
516 # inform parent process of the server's address
517 writer.send(server.address)
518 writer.close()
519
520 # run the manager
521 util.info('manager serving at %r', server.address)
522 server.serve_forever()
523
524 def _create(self, typeid, *args, **kwds):
525 '''
526 Create a new shared object; return the token and exposed tuple
527 '''
528 assert self._state.value == State.STARTED, 'server not yet started'
529 conn = self._Client(self._address, authkey=self._authkey)
530 try:
531 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
532 finally:
533 conn.close()
534 return Token(typeid, self._address, id), exposed
535
536 def join(self, timeout=None):
537 '''
538 Join the manager process (if it has been spawned)
539 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100540 if self._process is not None:
541 self._process.join(timeout)
542 if not self._process.is_alive():
543 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000544
545 def _debug_info(self):
546 '''
547 Return some info about the servers shared objects and connections
548 '''
549 conn = self._Client(self._address, authkey=self._authkey)
550 try:
551 return dispatch(conn, None, 'debug_info')
552 finally:
553 conn.close()
554
555 def _number_of_objects(self):
556 '''
557 Return the number of shared objects
558 '''
559 conn = self._Client(self._address, authkey=self._authkey)
560 try:
561 return dispatch(conn, None, 'number_of_objects')
562 finally:
563 conn.close()
564
565 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100566 if self._state.value == State.INITIAL:
567 self.start()
568 assert self._state.value == State.STARTED
Benjamin Petersone711caf2008-06-11 16:44:04 +0000569 return self
570
571 def __exit__(self, exc_type, exc_val, exc_tb):
572 self.shutdown()
573
574 @staticmethod
575 def _finalize_manager(process, address, authkey, state, _Client):
576 '''
577 Shutdown the manager process; will be registered as a finalizer
578 '''
579 if process.is_alive():
580 util.info('sending shutdown message to manager')
581 try:
582 conn = _Client(address, authkey=authkey)
583 try:
584 dispatch(conn, None, 'shutdown')
585 finally:
586 conn.close()
587 except Exception:
588 pass
589
Richard Oudkerk3049f122012-06-15 20:08:29 +0100590 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000591 if process.is_alive():
592 util.info('manager still alive')
593 if hasattr(process, 'terminate'):
594 util.info('trying to `terminate()` manager process')
595 process.terminate()
596 process.join(timeout=0.1)
597 if process.is_alive():
598 util.info('manager still alive after terminate')
599
600 state.value = State.SHUTDOWN
601 try:
602 del BaseProxy._address_to_local[address]
603 except KeyError:
604 pass
605
606 address = property(lambda self: self._address)
607
608 @classmethod
609 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
610 method_to_typeid=None, create_method=True):
611 '''
612 Register a typeid with the manager type
613 '''
614 if '_registry' not in cls.__dict__:
615 cls._registry = cls._registry.copy()
616
617 if proxytype is None:
618 proxytype = AutoProxy
619
620 exposed = exposed or getattr(proxytype, '_exposed_', None)
621
622 method_to_typeid = method_to_typeid or \
623 getattr(proxytype, '_method_to_typeid_', None)
624
625 if method_to_typeid:
626 for key, value in list(method_to_typeid.items()):
627 assert type(key) is str, '%r is not a string' % key
628 assert type(value) is str, '%r is not a string' % value
629
630 cls._registry[typeid] = (
631 callable, exposed, method_to_typeid, proxytype
632 )
633
634 if create_method:
635 def temp(self, *args, **kwds):
636 util.debug('requesting creation of a shared %r object', typeid)
637 token, exp = self._create(typeid, *args, **kwds)
638 proxy = proxytype(
639 token, self._serializer, manager=self,
640 authkey=self._authkey, exposed=exp
641 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000642 conn = self._Client(token.address, authkey=self._authkey)
643 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000644 return proxy
645 temp.__name__ = typeid
646 setattr(cls, typeid, temp)
647
648#
649# Subclass of set which get cleared after a fork
650#
651
652class ProcessLocalSet(set):
653 def __init__(self):
654 util.register_after_fork(self, lambda obj: obj.clear())
655 def __reduce__(self):
656 return type(self), ()
657
658#
659# Definition of BaseProxy
660#
661
662class BaseProxy(object):
663 '''
664 A base for proxies of shared objects
665 '''
666 _address_to_local = {}
667 _mutex = util.ForkAwareThreadLock()
668
669 def __init__(self, token, serializer, manager=None,
670 authkey=None, exposed=None, incref=True):
671 BaseProxy._mutex.acquire()
672 try:
673 tls_idset = BaseProxy._address_to_local.get(token.address, None)
674 if tls_idset is None:
675 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
676 BaseProxy._address_to_local[token.address] = tls_idset
677 finally:
678 BaseProxy._mutex.release()
679
680 # self._tls is used to record the connection used by this
681 # thread to communicate with the manager at token.address
682 self._tls = tls_idset[0]
683
684 # self._idset is used to record the identities of all shared
685 # objects for which the current process owns references and
686 # which are in the manager at token.address
687 self._idset = tls_idset[1]
688
689 self._token = token
690 self._id = self._token.id
691 self._manager = manager
692 self._serializer = serializer
693 self._Client = listener_client[serializer][1]
694
695 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100696 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000697 elif self._manager is not None:
698 self._authkey = self._manager._authkey
699 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100700 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000701
702 if incref:
703 self._incref()
704
705 util.register_after_fork(self, BaseProxy._after_fork)
706
707 def _connect(self):
708 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100709 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000710 if threading.current_thread().name != 'MainThread':
711 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000712 conn = self._Client(self._token.address, authkey=self._authkey)
713 dispatch(conn, None, 'accept_connection', (name,))
714 self._tls.connection = conn
715
716 def _callmethod(self, methodname, args=(), kwds={}):
717 '''
718 Try to call a method of the referrent and return a copy of the result
719 '''
720 try:
721 conn = self._tls.connection
722 except AttributeError:
723 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000724 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000725 self._connect()
726 conn = self._tls.connection
727
728 conn.send((self._id, methodname, args, kwds))
729 kind, result = conn.recv()
730
731 if kind == '#RETURN':
732 return result
733 elif kind == '#PROXY':
734 exposed, token = result
735 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100736 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000737 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000738 token, self._serializer, manager=self._manager,
739 authkey=self._authkey, exposed=exposed
740 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000741 conn = self._Client(token.address, authkey=self._authkey)
742 dispatch(conn, None, 'decref', (token.id,))
743 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000744 raise convert_to_error(kind, result)
745
746 def _getvalue(self):
747 '''
748 Get a copy of the value of the referent
749 '''
750 return self._callmethod('#GETVALUE')
751
752 def _incref(self):
753 conn = self._Client(self._token.address, authkey=self._authkey)
754 dispatch(conn, None, 'incref', (self._id,))
755 util.debug('INCREF %r', self._token.id)
756
757 self._idset.add(self._id)
758
759 state = self._manager and self._manager._state
760
761 self._close = util.Finalize(
762 self, BaseProxy._decref,
763 args=(self._token, self._authkey, state,
764 self._tls, self._idset, self._Client),
765 exitpriority=10
766 )
767
768 @staticmethod
769 def _decref(token, authkey, state, tls, idset, _Client):
770 idset.discard(token.id)
771
772 # check whether manager is still alive
773 if state is None or state.value == State.STARTED:
774 # tell manager this process no longer cares about referent
775 try:
776 util.debug('DECREF %r', token.id)
777 conn = _Client(token.address, authkey=authkey)
778 dispatch(conn, None, 'decref', (token.id,))
779 except Exception as e:
780 util.debug('... decref failed %s', e)
781
782 else:
783 util.debug('DECREF %r -- manager already shutdown', token.id)
784
785 # check whether we can close this thread's connection because
786 # the process owns no more references to objects for this manager
787 if not idset and hasattr(tls, 'connection'):
788 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000789 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000790 tls.connection.close()
791 del tls.connection
792
793 def _after_fork(self):
794 self._manager = None
795 try:
796 self._incref()
797 except Exception as e:
798 # the proxy may just be for a manager which has shutdown
799 util.info('incref failed: %s' % e)
800
801 def __reduce__(self):
802 kwds = {}
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100803 if popen.get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804 kwds['authkey'] = self._authkey
805
806 if getattr(self, '_isauto', False):
807 kwds['exposed'] = self._exposed_
808 return (RebuildProxy,
809 (AutoProxy, self._token, self._serializer, kwds))
810 else:
811 return (RebuildProxy,
812 (type(self), self._token, self._serializer, kwds))
813
814 def __deepcopy__(self, memo):
815 return self._getvalue()
816
817 def __repr__(self):
818 return '<%s object, typeid %r at %s>' % \
819 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
820
821 def __str__(self):
822 '''
823 Return representation of the referent (or a fall-back if that fails)
824 '''
825 try:
826 return self._callmethod('__repr__')
827 except Exception:
828 return repr(self)[:-1] + "; '__str__()' failed>"
829
830#
831# Function used for unpickling
832#
833
834def RebuildProxy(func, token, serializer, kwds):
835 '''
836 Function used for unpickling proxy objects.
837
838 If possible the shared object is returned, or otherwise a proxy for it.
839 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100840 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000841
842 if server and server.address == token.address:
843 return server.id_to_obj[token.id][0]
844 else:
845 incref = (
846 kwds.pop('incref', True) and
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100847 not getattr(process.current_process(), '_inheriting', False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000848 )
849 return func(token, serializer, incref=incref, **kwds)
850
851#
852# Functions to create proxies and proxy types
853#
854
855def MakeProxyType(name, exposed, _cache={}):
856 '''
857 Return an proxy type whose methods are given by `exposed`
858 '''
859 exposed = tuple(exposed)
860 try:
861 return _cache[(name, exposed)]
862 except KeyError:
863 pass
864
865 dic = {}
866
867 for meth in exposed:
868 exec('''def %s(self, *args, **kwds):
869 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
870
871 ProxyType = type(name, (BaseProxy,), dic)
872 ProxyType._exposed_ = exposed
873 _cache[(name, exposed)] = ProxyType
874 return ProxyType
875
876
877def AutoProxy(token, serializer, manager=None, authkey=None,
878 exposed=None, incref=True):
879 '''
880 Return an auto-proxy for `token`
881 '''
882 _Client = listener_client[serializer][1]
883
884 if exposed is None:
885 conn = _Client(token.address, authkey=authkey)
886 try:
887 exposed = dispatch(conn, None, 'get_methods', (token,))
888 finally:
889 conn.close()
890
891 if authkey is None and manager is not None:
892 authkey = manager._authkey
893 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100894 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000895
896 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
897 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
898 incref=incref)
899 proxy._isauto = True
900 return proxy
901
902#
903# Types/callables which we will register with SyncManager
904#
905
906class Namespace(object):
907 def __init__(self, **kwds):
908 self.__dict__.update(kwds)
909 def __repr__(self):
910 items = list(self.__dict__.items())
911 temp = []
912 for name, value in items:
913 if not name.startswith('_'):
914 temp.append('%s=%r' % (name, value))
915 temp.sort()
916 return 'Namespace(%s)' % str.join(', ', temp)
917
918class Value(object):
919 def __init__(self, typecode, value, lock=True):
920 self._typecode = typecode
921 self._value = value
922 def get(self):
923 return self._value
924 def set(self, value):
925 self._value = value
926 def __repr__(self):
927 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
928 value = property(get, set)
929
930def Array(typecode, sequence, lock=True):
931 return array.array(typecode, sequence)
932
933#
934# Proxy types used by SyncManager
935#
936
937class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000938 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000939 def __iter__(self):
940 return self
941 def __next__(self, *args):
942 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000943 def send(self, *args):
944 return self._callmethod('send', args)
945 def throw(self, *args):
946 return self._callmethod('throw', args)
947 def close(self, *args):
948 return self._callmethod('close', args)
949
950
951class AcquirerProxy(BaseProxy):
952 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100953 def acquire(self, blocking=True, timeout=None):
954 args = (blocking,) if timeout is None else (blocking, timeout)
955 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000956 def release(self):
957 return self._callmethod('release')
958 def __enter__(self):
959 return self._callmethod('acquire')
960 def __exit__(self, exc_type, exc_val, exc_tb):
961 return self._callmethod('release')
962
963
964class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000965 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000966 def wait(self, timeout=None):
967 return self._callmethod('wait', (timeout,))
968 def notify(self):
969 return self._callmethod('notify')
970 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000971 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200972 def wait_for(self, predicate, timeout=None):
973 result = predicate()
974 if result:
975 return result
976 if timeout is not None:
977 endtime = _time() + timeout
978 else:
979 endtime = None
980 waittime = None
981 while not result:
982 if endtime is not None:
983 waittime = endtime - _time()
984 if waittime <= 0:
985 break
986 self.wait(waittime)
987 result = predicate()
988 return result
989
Benjamin Petersone711caf2008-06-11 16:44:04 +0000990
991class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000992 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000993 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000994 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000995 def set(self):
996 return self._callmethod('set')
997 def clear(self):
998 return self._callmethod('clear')
999 def wait(self, timeout=None):
1000 return self._callmethod('wait', (timeout,))
1001
Richard Oudkerk3730a172012-06-15 18:26:07 +01001002
1003class BarrierProxy(BaseProxy):
1004 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1005 def wait(self, timeout=None):
1006 return self._callmethod('wait', (timeout,))
1007 def abort(self):
1008 return self._callmethod('abort')
1009 def reset(self):
1010 return self._callmethod('reset')
1011 @property
1012 def parties(self):
1013 return self._callmethod('__getattribute__', ('parties',))
1014 @property
1015 def n_waiting(self):
1016 return self._callmethod('__getattribute__', ('n_waiting',))
1017 @property
1018 def broken(self):
1019 return self._callmethod('__getattribute__', ('broken',))
1020
1021
Benjamin Petersone711caf2008-06-11 16:44:04 +00001022class NamespaceProxy(BaseProxy):
1023 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1024 def __getattr__(self, key):
1025 if key[0] == '_':
1026 return object.__getattribute__(self, key)
1027 callmethod = object.__getattribute__(self, '_callmethod')
1028 return callmethod('__getattribute__', (key,))
1029 def __setattr__(self, key, value):
1030 if key[0] == '_':
1031 return object.__setattr__(self, key, value)
1032 callmethod = object.__getattribute__(self, '_callmethod')
1033 return callmethod('__setattr__', (key, value))
1034 def __delattr__(self, key):
1035 if key[0] == '_':
1036 return object.__delattr__(self, key)
1037 callmethod = object.__getattribute__(self, '_callmethod')
1038 return callmethod('__delattr__', (key,))
1039
1040
1041class ValueProxy(BaseProxy):
1042 _exposed_ = ('get', 'set')
1043 def get(self):
1044 return self._callmethod('get')
1045 def set(self, value):
1046 return self._callmethod('set', (value,))
1047 value = property(get, set)
1048
1049
1050BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001051 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1052 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001053 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1054 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001055 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001056class ListProxy(BaseListProxy):
1057 def __iadd__(self, value):
1058 self._callmethod('extend', (value,))
1059 return self
1060 def __imul__(self, value):
1061 self._callmethod('__imul__', (value,))
1062 return self
1063
1064
1065DictProxy = MakeProxyType('DictProxy', (
1066 '__contains__', '__delitem__', '__getitem__', '__len__',
1067 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1068 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1069 ))
1070
1071
1072ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001073 '__len__', '__getitem__', '__setitem__'
1074 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001075
1076
1077PoolProxy = MakeProxyType('PoolProxy', (
1078 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Antoine Pitroude911b22011-12-21 11:03:24 +01001079 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
Benjamin Petersone711caf2008-06-11 16:44:04 +00001080 ))
1081PoolProxy._method_to_typeid_ = {
1082 'apply_async': 'AsyncResult',
1083 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001084 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085 'imap': 'Iterator',
1086 'imap_unordered': 'Iterator'
1087 }
1088
1089#
1090# Definition of SyncManager
1091#
1092
1093class SyncManager(BaseManager):
1094 '''
1095 Subclass of `BaseManager` which supports a number of shared object types.
1096
1097 The types registered are those intended for the synchronization
1098 of threads, plus `dict`, `list` and `Namespace`.
1099
1100 The `multiprocessing.Manager()` function creates started instances of
1101 this class.
1102 '''
1103
1104SyncManager.register('Queue', queue.Queue)
1105SyncManager.register('JoinableQueue', queue.Queue)
1106SyncManager.register('Event', threading.Event, EventProxy)
1107SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1108SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1109SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1110SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1111 AcquirerProxy)
1112SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001113SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001114SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001115SyncManager.register('list', list, ListProxy)
1116SyncManager.register('dict', dict, DictProxy)
1117SyncManager.register('Value', Value, ValueProxy)
1118SyncManager.register('Array', Array, ArrayProxy)
1119SyncManager.register('Namespace', Namespace, NamespaceProxy)
1120
1121# types returned by methods of PoolProxy
1122SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1123SyncManager.register('AsyncResult', create_method=False)