blob: 66d46fcc2a9066d092c834efc05ce71e97279661 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import threading
19import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000020import queue
21
Charles-François Natalic8ce7152012-04-17 18:45:57 +020022from time import time as _time
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010023from traceback import format_exc
24
25from . import connection
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010026from . import context
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010027from . import pool
28from . import process
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010029from . import reduction
30from . import util
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010031from . import get_context
Benjamin Petersone711caf2008-06-11 16:44:04 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
Benjamin Petersone711caf2008-06-11 16:44:04 +000034# Register some things for pickling
35#
36
37def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000038 return array.array, (a.typecode, a.tobytes())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010039reduction.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000040
41view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000042if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000043 def rebuild_as_list(obj):
44 return list, (list(obj),)
45 for view_type in view_types:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010046 reduction.register(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000047
48#
49# Type for identifying shared objects
50#
51
52class Token(object):
53 '''
54 Type to uniquely indentify a shared object
55 '''
56 __slots__ = ('typeid', 'address', 'id')
57
58 def __init__(self, typeid, address, id):
59 (self.typeid, self.address, self.id) = (typeid, address, id)
60
61 def __getstate__(self):
62 return (self.typeid, self.address, self.id)
63
64 def __setstate__(self, state):
65 (self.typeid, self.address, self.id) = state
66
67 def __repr__(self):
68 return 'Token(typeid=%r, address=%r, id=%r)' % \
69 (self.typeid, self.address, self.id)
70
71#
72# Function for communication with a manager's server process
73#
74
75def dispatch(c, id, methodname, args=(), kwds={}):
76 '''
77 Send a message to manager using connection `c` and return response
78 '''
79 c.send((id, methodname, args, kwds))
80 kind, result = c.recv()
81 if kind == '#RETURN':
82 return result
83 raise convert_to_error(kind, result)
84
85def convert_to_error(kind, result):
86 if kind == '#ERROR':
87 return result
88 elif kind == '#TRACEBACK':
89 assert type(result) is str
90 return RemoteError(result)
91 elif kind == '#UNSERIALIZABLE':
92 assert type(result) is str
93 return RemoteError('Unserializable message: %s\n' % result)
94 else:
95 return ValueError('Unrecognized message type')
96
97class RemoteError(Exception):
98 def __str__(self):
99 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
100
101#
102# Functions for finding the method names of an object
103#
104
105def all_methods(obj):
106 '''
107 Return a list of names of methods of `obj`
108 '''
109 temp = []
110 for name in dir(obj):
111 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200112 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000113 temp.append(name)
114 return temp
115
116def public_methods(obj):
117 '''
118 Return a list of names of methods of `obj` which do not start with '_'
119 '''
120 return [name for name in all_methods(obj) if name[0] != '_']
121
122#
123# Server which is run in a process controlled by a manager
124#
125
126class Server(object):
127 '''
128 Server class which runs in a process controlled by a manager object
129 '''
130 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
131 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
132
133 def __init__(self, registry, address, authkey, serializer):
134 assert isinstance(authkey, bytes)
135 self.registry = registry
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100136 self.authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000137 Listener, Client = listener_client[serializer]
138
139 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100140 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000141 self.address = self.listener.address
142
Jesse Noller63b3a972009-01-21 02:15:48 +0000143 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000144 self.id_to_refcount = {}
145 self.mutex = threading.RLock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146
147 def serve_forever(self):
148 '''
149 Run the server forever
150 '''
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100151 self.stop_event = threading.Event()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100152 process.current_process()._manager_server = self
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100154 accepter = threading.Thread(target=self.accepter)
155 accepter.daemon = True
156 accepter.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000157 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100158 while not self.stop_event.is_set():
159 self.stop_event.wait(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160 except (KeyboardInterrupt, SystemExit):
161 pass
162 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100163 if sys.stdout != sys.__stdout__:
164 util.debug('resetting stdout, stderr')
165 sys.stdout = sys.__stdout__
166 sys.stderr = sys.__stderr__
167 sys.exit(0)
168
169 def accepter(self):
170 while True:
171 try:
172 c = self.listener.accept()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200173 except OSError:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100174 continue
175 t = threading.Thread(target=self.handle_request, args=(c,))
176 t.daemon = True
177 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178
179 def handle_request(self, c):
180 '''
181 Handle a new connection
182 '''
183 funcname = result = request = None
184 try:
185 connection.deliver_challenge(c, self.authkey)
186 connection.answer_challenge(c, self.authkey)
187 request = c.recv()
188 ignore, funcname, args, kwds = request
189 assert funcname in self.public, '%r unrecognized' % funcname
190 func = getattr(self, funcname)
191 except Exception:
192 msg = ('#TRACEBACK', format_exc())
193 else:
194 try:
195 result = func(c, *args, **kwds)
196 except Exception:
197 msg = ('#TRACEBACK', format_exc())
198 else:
199 msg = ('#RETURN', result)
200 try:
201 c.send(msg)
202 except Exception as e:
203 try:
204 c.send(('#TRACEBACK', format_exc()))
205 except Exception:
206 pass
207 util.info('Failure to send message: %r', msg)
208 util.info(' ... request was %r', request)
209 util.info(' ... exception was %r', e)
210
211 c.close()
212
213 def serve_client(self, conn):
214 '''
215 Handle requests from the proxies in a particular process/thread
216 '''
217 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000218 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219
220 recv = conn.recv
221 send = conn.send
222 id_to_obj = self.id_to_obj
223
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100224 while not self.stop_event.is_set():
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 try:
227 methodname = obj = None
228 request = recv()
229 ident, methodname, args, kwds = request
230 obj, exposed, gettypeid = id_to_obj[ident]
231
232 if methodname not in exposed:
233 raise AttributeError(
234 'method %r of %r object is not in exposed=%r' %
235 (methodname, type(obj), exposed)
236 )
237
238 function = getattr(obj, methodname)
239
240 try:
241 res = function(*args, **kwds)
242 except Exception as e:
243 msg = ('#ERROR', e)
244 else:
245 typeid = gettypeid and gettypeid.get(methodname, None)
246 if typeid:
247 rident, rexposed = self.create(conn, typeid, res)
248 token = Token(typeid, self.address, rident)
249 msg = ('#PROXY', (rexposed, token))
250 else:
251 msg = ('#RETURN', res)
252
253 except AttributeError:
254 if methodname is None:
255 msg = ('#TRACEBACK', format_exc())
256 else:
257 try:
258 fallback_func = self.fallback_mapping[methodname]
259 result = fallback_func(
260 self, conn, ident, obj, *args, **kwds
261 )
262 msg = ('#RETURN', result)
263 except Exception:
264 msg = ('#TRACEBACK', format_exc())
265
266 except EOFError:
267 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000268 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269 sys.exit(0)
270
271 except Exception:
272 msg = ('#TRACEBACK', format_exc())
273
274 try:
275 try:
276 send(msg)
277 except Exception as e:
278 send(('#UNSERIALIZABLE', repr(msg)))
279 except Exception as e:
280 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000281 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282 util.info(' ... message was %r', msg)
283 util.info(' ... exception was %r', e)
284 conn.close()
285 sys.exit(1)
286
287 def fallback_getvalue(self, conn, ident, obj):
288 return obj
289
290 def fallback_str(self, conn, ident, obj):
291 return str(obj)
292
293 def fallback_repr(self, conn, ident, obj):
294 return repr(obj)
295
296 fallback_mapping = {
297 '__str__':fallback_str,
298 '__repr__':fallback_repr,
299 '#GETVALUE':fallback_getvalue
300 }
301
302 def dummy(self, c):
303 pass
304
305 def debug_info(self, c):
306 '''
307 Return some info --- useful to spot problems with refcounting
308 '''
309 self.mutex.acquire()
310 try:
311 result = []
312 keys = list(self.id_to_obj.keys())
313 keys.sort()
314 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000315 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000316 result.append(' %s: refcount=%s\n %s' %
317 (ident, self.id_to_refcount[ident],
318 str(self.id_to_obj[ident][0])[:75]))
319 return '\n'.join(result)
320 finally:
321 self.mutex.release()
322
323 def number_of_objects(self, c):
324 '''
325 Number of shared objects
326 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000327 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000328
329 def shutdown(self, c):
330 '''
331 Shutdown this process
332 '''
333 try:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100334 util.debug('manager received shutdown message')
335 c.send(('#RETURN', None))
336 except:
337 import traceback
338 traceback.print_exc()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339 finally:
Richard Oudkerk73d9a292012-06-14 15:30:10 +0100340 self.stop_event.set()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000341
342 def create(self, c, typeid, *args, **kwds):
343 '''
344 Create a new shared object and return its id
345 '''
346 self.mutex.acquire()
347 try:
348 callable, exposed, method_to_typeid, proxytype = \
349 self.registry[typeid]
350
351 if callable is None:
352 assert len(args) == 1 and not kwds
353 obj = args[0]
354 else:
355 obj = callable(*args, **kwds)
356
357 if exposed is None:
358 exposed = public_methods(obj)
359 if method_to_typeid is not None:
360 assert type(method_to_typeid) is dict
361 exposed = list(exposed) + list(method_to_typeid)
362
363 ident = '%x' % id(obj) # convert to string because xmlrpclib
364 # only has 32 bit signed integers
365 util.debug('%r callable returned object with id %r', typeid, ident)
366
367 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
368 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000369 self.id_to_refcount[ident] = 0
370 # increment the reference count immediately, to avoid
371 # this object being garbage collected before a Proxy
372 # object for it can be created. The caller of create()
373 # is responsible for doing a decref once the Proxy object
374 # has been created.
375 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000376 return ident, tuple(exposed)
377 finally:
378 self.mutex.release()
379
380 def get_methods(self, c, token):
381 '''
382 Return the methods of the shared object indicated by token
383 '''
384 return tuple(self.id_to_obj[token.id][1])
385
386 def accept_connection(self, c, name):
387 '''
388 Spawn a new thread to serve this connection
389 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000390 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000391 c.send(('#RETURN', None))
392 self.serve_client(c)
393
394 def incref(self, c, ident):
395 self.mutex.acquire()
396 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000397 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000398 finally:
399 self.mutex.release()
400
401 def decref(self, c, ident):
402 self.mutex.acquire()
403 try:
404 assert self.id_to_refcount[ident] >= 1
405 self.id_to_refcount[ident] -= 1
406 if self.id_to_refcount[ident] == 0:
407 del self.id_to_obj[ident], self.id_to_refcount[ident]
Benjamin Peterson4ac9ce42009-10-04 14:49:41 +0000408 util.debug('disposing of obj with id %r', ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409 finally:
410 self.mutex.release()
411
412#
413# Class to represent state of a manager
414#
415
416class State(object):
417 __slots__ = ['value']
418 INITIAL = 0
419 STARTED = 1
420 SHUTDOWN = 2
421
422#
423# Mapping from serializer name to Listener and Client types
424#
425
426listener_client = {
427 'pickle' : (connection.Listener, connection.Client),
428 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
429 }
430
431#
432# Definition of BaseManager
433#
434
435class BaseManager(object):
436 '''
437 Base class for managers
438 '''
439 _registry = {}
440 _Server = Server
441
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100442 def __init__(self, address=None, authkey=None, serializer='pickle',
443 ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000444 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100445 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000446 self._address = address # XXX not final address if eg ('', 0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100447 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000448 self._state = State()
449 self._state.value = State.INITIAL
450 self._serializer = serializer
451 self._Listener, self._Client = listener_client[serializer]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100452 self._ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453
Benjamin Petersone711caf2008-06-11 16:44:04 +0000454 def get_server(self):
455 '''
456 Return server object with serve_forever() method and address attribute
457 '''
458 assert self._state.value == State.INITIAL
459 return Server(self._registry, self._address,
460 self._authkey, self._serializer)
461
462 def connect(self):
463 '''
464 Connect manager object to the server process
465 '''
466 Listener, Client = listener_client[self._serializer]
467 conn = Client(self._address, authkey=self._authkey)
468 dispatch(conn, None, 'dummy')
469 self._state.value = State.STARTED
470
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000471 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000472 '''
473 Spawn a server process for this manager object
474 '''
475 assert self._state.value == State.INITIAL
476
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200477 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000478 raise TypeError('initializer must be a callable')
479
Benjamin Petersone711caf2008-06-11 16:44:04 +0000480 # pipe over which we will retrieve address of server
481 reader, writer = connection.Pipe(duplex=False)
482
483 # spawn process which runs a server
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100484 self._process = self._ctx.Process(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000485 target=type(self)._run_server,
486 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000487 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000488 )
489 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000490 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000491 self._process.start()
492
493 # get address of server
494 writer.close()
495 self._address = reader.recv()
496 reader.close()
497
498 # register a finalizer
499 self._state.value = State.STARTED
500 self.shutdown = util.Finalize(
501 self, type(self)._finalize_manager,
502 args=(self._process, self._address, self._authkey,
503 self._state, self._Client),
504 exitpriority=0
505 )
506
507 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000508 def _run_server(cls, registry, address, authkey, serializer, writer,
509 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510 '''
511 Create a server, report its address and run it
512 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000513 if initializer is not None:
514 initializer(*initargs)
515
Benjamin Petersone711caf2008-06-11 16:44:04 +0000516 # create server
517 server = cls._Server(registry, address, authkey, serializer)
518
519 # inform parent process of the server's address
520 writer.send(server.address)
521 writer.close()
522
523 # run the manager
524 util.info('manager serving at %r', server.address)
525 server.serve_forever()
526
527 def _create(self, typeid, *args, **kwds):
528 '''
529 Create a new shared object; return the token and exposed tuple
530 '''
531 assert self._state.value == State.STARTED, 'server not yet started'
532 conn = self._Client(self._address, authkey=self._authkey)
533 try:
534 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
535 finally:
536 conn.close()
537 return Token(typeid, self._address, id), exposed
538
539 def join(self, timeout=None):
540 '''
541 Join the manager process (if it has been spawned)
542 '''
Richard Oudkerka6becaa2012-05-03 18:29:02 +0100543 if self._process is not None:
544 self._process.join(timeout)
545 if not self._process.is_alive():
546 self._process = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000547
548 def _debug_info(self):
549 '''
550 Return some info about the servers shared objects and connections
551 '''
552 conn = self._Client(self._address, authkey=self._authkey)
553 try:
554 return dispatch(conn, None, 'debug_info')
555 finally:
556 conn.close()
557
558 def _number_of_objects(self):
559 '''
560 Return the number of shared objects
561 '''
562 conn = self._Client(self._address, authkey=self._authkey)
563 try:
564 return dispatch(conn, None, 'number_of_objects')
565 finally:
566 conn.close()
567
568 def __enter__(self):
Richard Oudkerkac385712012-06-18 21:29:30 +0100569 if self._state.value == State.INITIAL:
570 self.start()
571 assert self._state.value == State.STARTED
Benjamin Petersone711caf2008-06-11 16:44:04 +0000572 return self
573
574 def __exit__(self, exc_type, exc_val, exc_tb):
575 self.shutdown()
576
577 @staticmethod
578 def _finalize_manager(process, address, authkey, state, _Client):
579 '''
580 Shutdown the manager process; will be registered as a finalizer
581 '''
582 if process.is_alive():
583 util.info('sending shutdown message to manager')
584 try:
585 conn = _Client(address, authkey=authkey)
586 try:
587 dispatch(conn, None, 'shutdown')
588 finally:
589 conn.close()
590 except Exception:
591 pass
592
Richard Oudkerk3049f122012-06-15 20:08:29 +0100593 process.join(timeout=1.0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000594 if process.is_alive():
595 util.info('manager still alive')
596 if hasattr(process, 'terminate'):
597 util.info('trying to `terminate()` manager process')
598 process.terminate()
599 process.join(timeout=0.1)
600 if process.is_alive():
601 util.info('manager still alive after terminate')
602
603 state.value = State.SHUTDOWN
604 try:
605 del BaseProxy._address_to_local[address]
606 except KeyError:
607 pass
608
609 address = property(lambda self: self._address)
610
611 @classmethod
612 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
613 method_to_typeid=None, create_method=True):
614 '''
615 Register a typeid with the manager type
616 '''
617 if '_registry' not in cls.__dict__:
618 cls._registry = cls._registry.copy()
619
620 if proxytype is None:
621 proxytype = AutoProxy
622
623 exposed = exposed or getattr(proxytype, '_exposed_', None)
624
625 method_to_typeid = method_to_typeid or \
626 getattr(proxytype, '_method_to_typeid_', None)
627
628 if method_to_typeid:
629 for key, value in list(method_to_typeid.items()):
630 assert type(key) is str, '%r is not a string' % key
631 assert type(value) is str, '%r is not a string' % value
632
633 cls._registry[typeid] = (
634 callable, exposed, method_to_typeid, proxytype
635 )
636
637 if create_method:
638 def temp(self, *args, **kwds):
639 util.debug('requesting creation of a shared %r object', typeid)
640 token, exp = self._create(typeid, *args, **kwds)
641 proxy = proxytype(
642 token, self._serializer, manager=self,
643 authkey=self._authkey, exposed=exp
644 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000645 conn = self._Client(token.address, authkey=self._authkey)
646 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647 return proxy
648 temp.__name__ = typeid
649 setattr(cls, typeid, temp)
650
651#
652# Subclass of set which get cleared after a fork
653#
654
655class ProcessLocalSet(set):
656 def __init__(self):
657 util.register_after_fork(self, lambda obj: obj.clear())
658 def __reduce__(self):
659 return type(self), ()
660
661#
662# Definition of BaseProxy
663#
664
665class BaseProxy(object):
666 '''
667 A base for proxies of shared objects
668 '''
669 _address_to_local = {}
670 _mutex = util.ForkAwareThreadLock()
671
672 def __init__(self, token, serializer, manager=None,
673 authkey=None, exposed=None, incref=True):
674 BaseProxy._mutex.acquire()
675 try:
676 tls_idset = BaseProxy._address_to_local.get(token.address, None)
677 if tls_idset is None:
678 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
679 BaseProxy._address_to_local[token.address] = tls_idset
680 finally:
681 BaseProxy._mutex.release()
682
683 # self._tls is used to record the connection used by this
684 # thread to communicate with the manager at token.address
685 self._tls = tls_idset[0]
686
687 # self._idset is used to record the identities of all shared
688 # objects for which the current process owns references and
689 # which are in the manager at token.address
690 self._idset = tls_idset[1]
691
692 self._token = token
693 self._id = self._token.id
694 self._manager = manager
695 self._serializer = serializer
696 self._Client = listener_client[serializer][1]
697
698 if authkey is not None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100699 self._authkey = process.AuthenticationString(authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000700 elif self._manager is not None:
701 self._authkey = self._manager._authkey
702 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100703 self._authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000704
705 if incref:
706 self._incref()
707
708 util.register_after_fork(self, BaseProxy._after_fork)
709
710 def _connect(self):
711 util.debug('making connection to manager')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100712 name = process.current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000713 if threading.current_thread().name != 'MainThread':
714 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000715 conn = self._Client(self._token.address, authkey=self._authkey)
716 dispatch(conn, None, 'accept_connection', (name,))
717 self._tls.connection = conn
718
719 def _callmethod(self, methodname, args=(), kwds={}):
720 '''
721 Try to call a method of the referrent and return a copy of the result
722 '''
723 try:
724 conn = self._tls.connection
725 except AttributeError:
726 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000727 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000728 self._connect()
729 conn = self._tls.connection
730
731 conn.send((self._id, methodname, args, kwds))
732 kind, result = conn.recv()
733
734 if kind == '#RETURN':
735 return result
736 elif kind == '#PROXY':
737 exposed, token = result
738 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerke3e8bcf2013-07-02 13:37:43 +0100739 token.address = self._token.address
Jesse Noller824f4f32008-09-02 19:12:20 +0000740 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000741 token, self._serializer, manager=self._manager,
742 authkey=self._authkey, exposed=exposed
743 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000744 conn = self._Client(token.address, authkey=self._authkey)
745 dispatch(conn, None, 'decref', (token.id,))
746 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000747 raise convert_to_error(kind, result)
748
749 def _getvalue(self):
750 '''
751 Get a copy of the value of the referent
752 '''
753 return self._callmethod('#GETVALUE')
754
755 def _incref(self):
756 conn = self._Client(self._token.address, authkey=self._authkey)
757 dispatch(conn, None, 'incref', (self._id,))
758 util.debug('INCREF %r', self._token.id)
759
760 self._idset.add(self._id)
761
762 state = self._manager and self._manager._state
763
764 self._close = util.Finalize(
765 self, BaseProxy._decref,
766 args=(self._token, self._authkey, state,
767 self._tls, self._idset, self._Client),
768 exitpriority=10
769 )
770
771 @staticmethod
772 def _decref(token, authkey, state, tls, idset, _Client):
773 idset.discard(token.id)
774
775 # check whether manager is still alive
776 if state is None or state.value == State.STARTED:
777 # tell manager this process no longer cares about referent
778 try:
779 util.debug('DECREF %r', token.id)
780 conn = _Client(token.address, authkey=authkey)
781 dispatch(conn, None, 'decref', (token.id,))
782 except Exception as e:
783 util.debug('... decref failed %s', e)
784
785 else:
786 util.debug('DECREF %r -- manager already shutdown', token.id)
787
788 # check whether we can close this thread's connection because
789 # the process owns no more references to objects for this manager
790 if not idset and hasattr(tls, 'connection'):
791 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000792 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 tls.connection.close()
794 del tls.connection
795
796 def _after_fork(self):
797 self._manager = None
798 try:
799 self._incref()
800 except Exception as e:
801 # the proxy may just be for a manager which has shutdown
802 util.info('incref failed: %s' % e)
803
804 def __reduce__(self):
805 kwds = {}
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100806 if context.get_spawning_popen() is not None:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000807 kwds['authkey'] = self._authkey
808
809 if getattr(self, '_isauto', False):
810 kwds['exposed'] = self._exposed_
811 return (RebuildProxy,
812 (AutoProxy, self._token, self._serializer, kwds))
813 else:
814 return (RebuildProxy,
815 (type(self), self._token, self._serializer, kwds))
816
817 def __deepcopy__(self, memo):
818 return self._getvalue()
819
820 def __repr__(self):
821 return '<%s object, typeid %r at %s>' % \
822 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
823
824 def __str__(self):
825 '''
826 Return representation of the referent (or a fall-back if that fails)
827 '''
828 try:
829 return self._callmethod('__repr__')
830 except Exception:
831 return repr(self)[:-1] + "; '__str__()' failed>"
832
833#
834# Function used for unpickling
835#
836
837def RebuildProxy(func, token, serializer, kwds):
838 '''
839 Function used for unpickling proxy objects.
840
841 If possible the shared object is returned, or otherwise a proxy for it.
842 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100843 server = getattr(process.current_process(), '_manager_server', None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000844
845 if server and server.address == token.address:
846 return server.id_to_obj[token.id][0]
847 else:
848 incref = (
849 kwds.pop('incref', True) and
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100850 not getattr(process.current_process(), '_inheriting', False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000851 )
852 return func(token, serializer, incref=incref, **kwds)
853
854#
855# Functions to create proxies and proxy types
856#
857
858def MakeProxyType(name, exposed, _cache={}):
859 '''
860 Return an proxy type whose methods are given by `exposed`
861 '''
862 exposed = tuple(exposed)
863 try:
864 return _cache[(name, exposed)]
865 except KeyError:
866 pass
867
868 dic = {}
869
870 for meth in exposed:
871 exec('''def %s(self, *args, **kwds):
872 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
873
874 ProxyType = type(name, (BaseProxy,), dic)
875 ProxyType._exposed_ = exposed
876 _cache[(name, exposed)] = ProxyType
877 return ProxyType
878
879
880def AutoProxy(token, serializer, manager=None, authkey=None,
881 exposed=None, incref=True):
882 '''
883 Return an auto-proxy for `token`
884 '''
885 _Client = listener_client[serializer][1]
886
887 if exposed is None:
888 conn = _Client(token.address, authkey=authkey)
889 try:
890 exposed = dispatch(conn, None, 'get_methods', (token,))
891 finally:
892 conn.close()
893
894 if authkey is None and manager is not None:
895 authkey = manager._authkey
896 if authkey is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100897 authkey = process.current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898
899 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
900 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
901 incref=incref)
902 proxy._isauto = True
903 return proxy
904
905#
906# Types/callables which we will register with SyncManager
907#
908
909class Namespace(object):
910 def __init__(self, **kwds):
911 self.__dict__.update(kwds)
912 def __repr__(self):
913 items = list(self.__dict__.items())
914 temp = []
915 for name, value in items:
916 if not name.startswith('_'):
917 temp.append('%s=%r' % (name, value))
918 temp.sort()
919 return 'Namespace(%s)' % str.join(', ', temp)
920
921class Value(object):
922 def __init__(self, typecode, value, lock=True):
923 self._typecode = typecode
924 self._value = value
925 def get(self):
926 return self._value
927 def set(self, value):
928 self._value = value
929 def __repr__(self):
930 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
931 value = property(get, set)
932
933def Array(typecode, sequence, lock=True):
934 return array.array(typecode, sequence)
935
936#
937# Proxy types used by SyncManager
938#
939
940class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000941 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000942 def __iter__(self):
943 return self
944 def __next__(self, *args):
945 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946 def send(self, *args):
947 return self._callmethod('send', args)
948 def throw(self, *args):
949 return self._callmethod('throw', args)
950 def close(self, *args):
951 return self._callmethod('close', args)
952
953
954class AcquirerProxy(BaseProxy):
955 _exposed_ = ('acquire', 'release')
Richard Oudkerk41eb85b2012-05-06 16:45:02 +0100956 def acquire(self, blocking=True, timeout=None):
957 args = (blocking,) if timeout is None else (blocking, timeout)
958 return self._callmethod('acquire', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000959 def release(self):
960 return self._callmethod('release')
961 def __enter__(self):
962 return self._callmethod('acquire')
963 def __exit__(self, exc_type, exc_val, exc_tb):
964 return self._callmethod('release')
965
966
967class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000968 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000969 def wait(self, timeout=None):
970 return self._callmethod('wait', (timeout,))
971 def notify(self):
972 return self._callmethod('notify')
973 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000974 return self._callmethod('notify_all')
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200975 def wait_for(self, predicate, timeout=None):
976 result = predicate()
977 if result:
978 return result
979 if timeout is not None:
980 endtime = _time() + timeout
981 else:
982 endtime = None
983 waittime = None
984 while not result:
985 if endtime is not None:
986 waittime = endtime - _time()
987 if waittime <= 0:
988 break
989 self.wait(waittime)
990 result = predicate()
991 return result
992
Benjamin Petersone711caf2008-06-11 16:44:04 +0000993
994class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000995 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000996 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +0000997 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000998 def set(self):
999 return self._callmethod('set')
1000 def clear(self):
1001 return self._callmethod('clear')
1002 def wait(self, timeout=None):
1003 return self._callmethod('wait', (timeout,))
1004
Richard Oudkerk3730a172012-06-15 18:26:07 +01001005
1006class BarrierProxy(BaseProxy):
1007 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1008 def wait(self, timeout=None):
1009 return self._callmethod('wait', (timeout,))
1010 def abort(self):
1011 return self._callmethod('abort')
1012 def reset(self):
1013 return self._callmethod('reset')
1014 @property
1015 def parties(self):
1016 return self._callmethod('__getattribute__', ('parties',))
1017 @property
1018 def n_waiting(self):
1019 return self._callmethod('__getattribute__', ('n_waiting',))
1020 @property
1021 def broken(self):
1022 return self._callmethod('__getattribute__', ('broken',))
1023
1024
Benjamin Petersone711caf2008-06-11 16:44:04 +00001025class NamespaceProxy(BaseProxy):
1026 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1027 def __getattr__(self, key):
1028 if key[0] == '_':
1029 return object.__getattribute__(self, key)
1030 callmethod = object.__getattribute__(self, '_callmethod')
1031 return callmethod('__getattribute__', (key,))
1032 def __setattr__(self, key, value):
1033 if key[0] == '_':
1034 return object.__setattr__(self, key, value)
1035 callmethod = object.__getattribute__(self, '_callmethod')
1036 return callmethod('__setattr__', (key, value))
1037 def __delattr__(self, key):
1038 if key[0] == '_':
1039 return object.__delattr__(self, key)
1040 callmethod = object.__getattribute__(self, '_callmethod')
1041 return callmethod('__delattr__', (key,))
1042
1043
1044class ValueProxy(BaseProxy):
1045 _exposed_ = ('get', 'set')
1046 def get(self):
1047 return self._callmethod('get')
1048 def set(self, value):
1049 return self._callmethod('set', (value,))
1050 value = property(get, set)
1051
1052
1053BaseListProxy = MakeProxyType('BaseListProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001054 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1055 '__mul__', '__reversed__', '__rmul__', '__setitem__',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001056 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1057 'reverse', 'sort', '__imul__'
Richard Oudkerk1074a922012-05-29 12:01:45 +01001058 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001059class ListProxy(BaseListProxy):
1060 def __iadd__(self, value):
1061 self._callmethod('extend', (value,))
1062 return self
1063 def __imul__(self, value):
1064 self._callmethod('__imul__', (value,))
1065 return self
1066
1067
1068DictProxy = MakeProxyType('DictProxy', (
1069 '__contains__', '__delitem__', '__getitem__', '__len__',
1070 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1071 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1072 ))
1073
1074
1075ArrayProxy = MakeProxyType('ArrayProxy', (
Richard Oudkerk1074a922012-05-29 12:01:45 +01001076 '__len__', '__getitem__', '__setitem__'
1077 ))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001078
1079
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001080BasePoolProxy = MakeProxyType('PoolProxy', (
Benjamin Petersone711caf2008-06-11 16:44:04 +00001081 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001082 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001083 ))
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001084BasePoolProxy._method_to_typeid_ = {
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085 'apply_async': 'AsyncResult',
1086 'map_async': 'AsyncResult',
Antoine Pitroude911b22011-12-21 11:03:24 +01001087 'starmap_async': 'AsyncResult',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088 'imap': 'Iterator',
1089 'imap_unordered': 'Iterator'
1090 }
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001091class PoolProxy(BasePoolProxy):
1092 def __enter__(self):
1093 return self
1094 def __exit__(self, exc_type, exc_val, exc_tb):
1095 self.terminate()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001096
1097#
1098# Definition of SyncManager
1099#
1100
1101class SyncManager(BaseManager):
1102 '''
1103 Subclass of `BaseManager` which supports a number of shared object types.
1104
1105 The types registered are those intended for the synchronization
1106 of threads, plus `dict`, `list` and `Namespace`.
1107
1108 The `multiprocessing.Manager()` function creates started instances of
1109 this class.
1110 '''
1111
1112SyncManager.register('Queue', queue.Queue)
1113SyncManager.register('JoinableQueue', queue.Queue)
1114SyncManager.register('Event', threading.Event, EventProxy)
1115SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1116SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1117SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1118SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1119 AcquirerProxy)
1120SyncManager.register('Condition', threading.Condition, ConditionProxy)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001121SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001122SyncManager.register('Pool', pool.Pool, PoolProxy)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001123SyncManager.register('list', list, ListProxy)
1124SyncManager.register('dict', dict, DictProxy)
1125SyncManager.register('Value', Value, ValueProxy)
1126SyncManager.register('Array', Array, ArrayProxy)
1127SyncManager.register('Namespace', Namespace, NamespaceProxy)
1128
1129# types returned by methods of PoolProxy
1130SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1131SyncManager.register('AsyncResult', create_method=False)