blob: 08d35d868285c151f1c1a1a4b060d9af2db19cf1 [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray79af2452010-12-14 01:42:40 +00007# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15# notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17# notice, this list of conditions and the following disclaimer in the
18# documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20# used to endorse or promote products derived from this software
21# without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000034#
35
36__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
37
38#
39# Imports
40#
41
42import os
43import sys
44import weakref
45import threading
46import array
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000047import Queue
48
49from traceback import format_exc
50from multiprocessing import Process, current_process, active_children, Pool, util, connection
51from multiprocessing.process import AuthenticationString
Jesse Noller13e9d582008-07-16 14:32:36 +000052from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000053from multiprocessing.util import Finalize, info
54
55try:
56 from cPickle import PicklingError
57except ImportError:
58 from pickle import PicklingError
59
60#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000061# Register some things for pickling
62#
63
64def reduce_array(a):
65 return array.array, (a.typecode, a.tostring())
Jesse Noller13e9d582008-07-16 14:32:36 +000066ForkingPickler.register(array.array, reduce_array)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000067
68view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000069
70#
71# Type for identifying shared objects
72#
73
74class Token(object):
75 '''
76 Type to uniquely indentify a shared object
77 '''
78 __slots__ = ('typeid', 'address', 'id')
79
80 def __init__(self, typeid, address, id):
81 (self.typeid, self.address, self.id) = (typeid, address, id)
82
83 def __getstate__(self):
84 return (self.typeid, self.address, self.id)
85
86 def __setstate__(self, state):
87 (self.typeid, self.address, self.id) = state
88
89 def __repr__(self):
90 return 'Token(typeid=%r, address=%r, id=%r)' % \
91 (self.typeid, self.address, self.id)
92
93#
94# Function for communication with a manager's server process
95#
96
97def dispatch(c, id, methodname, args=(), kwds={}):
98 '''
99 Send a message to manager using connection `c` and return response
100 '''
101 c.send((id, methodname, args, kwds))
102 kind, result = c.recv()
103 if kind == '#RETURN':
104 return result
105 raise convert_to_error(kind, result)
106
107def convert_to_error(kind, result):
108 if kind == '#ERROR':
109 return result
110 elif kind == '#TRACEBACK':
111 assert type(result) is str
112 return RemoteError(result)
113 elif kind == '#UNSERIALIZABLE':
114 assert type(result) is str
115 return RemoteError('Unserializable message: %s\n' % result)
116 else:
117 return ValueError('Unrecognized message type')
118
119class RemoteError(Exception):
120 def __str__(self):
121 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
122
123#
124# Functions for finding the method names of an object
125#
126
127def all_methods(obj):
128 '''
129 Return a list of names of methods of `obj`
130 '''
131 temp = []
132 for name in dir(obj):
133 func = getattr(obj, name)
134 if hasattr(func, '__call__'):
135 temp.append(name)
136 return temp
137
138def public_methods(obj):
139 '''
140 Return a list of names of methods of `obj` which do not start with '_'
141 '''
142 return [name for name in all_methods(obj) if name[0] != '_']
143
144#
145# Server which is run in a process controlled by a manager
146#
147
148class Server(object):
149 '''
150 Server class which runs in a process controlled by a manager object
151 '''
152 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
153 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
154
155 def __init__(self, registry, address, authkey, serializer):
156 assert isinstance(authkey, bytes)
157 self.registry = registry
158 self.authkey = AuthenticationString(authkey)
159 Listener, Client = listener_client[serializer]
160
161 # do authentication later
Charles-François Natalib40827d2011-12-23 19:05:45 +0100162 self.listener = Listener(address=address, backlog=16)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000163 self.address = self.listener.address
164
Jesse Noller7314b382009-01-21 02:08:17 +0000165 self.id_to_obj = {'0': (None, ())}
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000166 self.id_to_refcount = {}
167 self.mutex = threading.RLock()
168 self.stop = 0
169
170 def serve_forever(self):
171 '''
172 Run the server forever
173 '''
174 current_process()._manager_server = self
175 try:
176 try:
177 while 1:
178 try:
179 c = self.listener.accept()
180 except (OSError, IOError):
181 continue
182 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Peterson82aa2012008-08-18 18:31:58 +0000183 t.daemon = True
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000184 t.start()
185 except (KeyboardInterrupt, SystemExit):
186 pass
187 finally:
188 self.stop = 999
189 self.listener.close()
190
191 def handle_request(self, c):
192 '''
193 Handle a new connection
194 '''
195 funcname = result = request = None
196 try:
197 connection.deliver_challenge(c, self.authkey)
198 connection.answer_challenge(c, self.authkey)
199 request = c.recv()
200 ignore, funcname, args, kwds = request
201 assert funcname in self.public, '%r unrecognized' % funcname
202 func = getattr(self, funcname)
203 except Exception:
204 msg = ('#TRACEBACK', format_exc())
205 else:
206 try:
207 result = func(c, *args, **kwds)
208 except Exception:
209 msg = ('#TRACEBACK', format_exc())
210 else:
211 msg = ('#RETURN', result)
212 try:
213 c.send(msg)
214 except Exception, e:
215 try:
216 c.send(('#TRACEBACK', format_exc()))
217 except Exception:
218 pass
219 util.info('Failure to send message: %r', msg)
220 util.info(' ... request was %r', request)
221 util.info(' ... exception was %r', e)
222
223 c.close()
224
225 def serve_client(self, conn):
226 '''
227 Handle requests from the proxies in a particular process/thread
228 '''
229 util.debug('starting server thread to service %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000230 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000231
232 recv = conn.recv
233 send = conn.send
234 id_to_obj = self.id_to_obj
235
236 while not self.stop:
237
238 try:
239 methodname = obj = None
240 request = recv()
241 ident, methodname, args, kwds = request
242 obj, exposed, gettypeid = id_to_obj[ident]
243
244 if methodname not in exposed:
245 raise AttributeError(
246 'method %r of %r object is not in exposed=%r' %
247 (methodname, type(obj), exposed)
248 )
249
250 function = getattr(obj, methodname)
251
252 try:
253 res = function(*args, **kwds)
254 except Exception, e:
255 msg = ('#ERROR', e)
256 else:
257 typeid = gettypeid and gettypeid.get(methodname, None)
258 if typeid:
259 rident, rexposed = self.create(conn, typeid, res)
260 token = Token(typeid, self.address, rident)
261 msg = ('#PROXY', (rexposed, token))
262 else:
263 msg = ('#RETURN', res)
264
265 except AttributeError:
266 if methodname is None:
267 msg = ('#TRACEBACK', format_exc())
268 else:
269 try:
270 fallback_func = self.fallback_mapping[methodname]
271 result = fallback_func(
272 self, conn, ident, obj, *args, **kwds
273 )
274 msg = ('#RETURN', result)
275 except Exception:
276 msg = ('#TRACEBACK', format_exc())
277
278 except EOFError:
279 util.debug('got EOF -- exiting thread serving %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000280 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000281 sys.exit(0)
282
283 except Exception:
284 msg = ('#TRACEBACK', format_exc())
285
286 try:
287 try:
288 send(msg)
289 except Exception, e:
290 send(('#UNSERIALIZABLE', repr(msg)))
291 except Exception, e:
292 util.info('exception in thread serving %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000293 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000294 util.info(' ... message was %r', msg)
295 util.info(' ... exception was %r', e)
296 conn.close()
297 sys.exit(1)
298
299 def fallback_getvalue(self, conn, ident, obj):
300 return obj
301
302 def fallback_str(self, conn, ident, obj):
303 return str(obj)
304
305 def fallback_repr(self, conn, ident, obj):
306 return repr(obj)
307
308 fallback_mapping = {
309 '__str__':fallback_str,
310 '__repr__':fallback_repr,
311 '#GETVALUE':fallback_getvalue
312 }
313
314 def dummy(self, c):
315 pass
316
317 def debug_info(self, c):
318 '''
319 Return some info --- useful to spot problems with refcounting
320 '''
321 self.mutex.acquire()
322 try:
323 result = []
324 keys = self.id_to_obj.keys()
325 keys.sort()
326 for ident in keys:
Jesse Noller7314b382009-01-21 02:08:17 +0000327 if ident != '0':
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000328 result.append(' %s: refcount=%s\n %s' %
329 (ident, self.id_to_refcount[ident],
330 str(self.id_to_obj[ident][0])[:75]))
331 return '\n'.join(result)
332 finally:
333 self.mutex.release()
334
335 def number_of_objects(self, c):
336 '''
337 Number of shared objects
338 '''
Jesse Noller7314b382009-01-21 02:08:17 +0000339 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000340
341 def shutdown(self, c):
342 '''
343 Shutdown this process
344 '''
345 try:
346 try:
347 util.debug('manager received shutdown message')
348 c.send(('#RETURN', None))
349
350 if sys.stdout != sys.__stdout__:
351 util.debug('resetting stdout, stderr')
352 sys.stdout = sys.__stdout__
353 sys.stderr = sys.__stderr__
354
355 util._run_finalizers(0)
356
357 for p in active_children():
358 util.debug('terminating a child process of manager')
359 p.terminate()
360
361 for p in active_children():
362 util.debug('terminating a child process of manager')
363 p.join()
364
365 util._run_finalizers()
366 util.info('manager exiting with exitcode 0')
367 except:
368 import traceback
369 traceback.print_exc()
370 finally:
371 exit(0)
372
373 def create(self, c, typeid, *args, **kwds):
374 '''
375 Create a new shared object and return its id
376 '''
377 self.mutex.acquire()
378 try:
379 callable, exposed, method_to_typeid, proxytype = \
380 self.registry[typeid]
381
382 if callable is None:
383 assert len(args) == 1 and not kwds
384 obj = args[0]
385 else:
386 obj = callable(*args, **kwds)
387
388 if exposed is None:
389 exposed = public_methods(obj)
390 if method_to_typeid is not None:
391 assert type(method_to_typeid) is dict
392 exposed = list(exposed) + list(method_to_typeid)
393
394 ident = '%x' % id(obj) # convert to string because xmlrpclib
395 # only has 32 bit signed integers
396 util.debug('%r callable returned object with id %r', typeid, ident)
397
398 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
399 if ident not in self.id_to_refcount:
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000400 self.id_to_refcount[ident] = 0
401 # increment the reference count immediately, to avoid
402 # this object being garbage collected before a Proxy
403 # object for it can be created. The caller of create()
404 # is responsible for doing a decref once the Proxy object
405 # has been created.
406 self.incref(c, ident)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000407 return ident, tuple(exposed)
408 finally:
409 self.mutex.release()
410
411 def get_methods(self, c, token):
412 '''
413 Return the methods of the shared object indicated by token
414 '''
415 return tuple(self.id_to_obj[token.id][1])
416
417 def accept_connection(self, c, name):
418 '''
419 Spawn a new thread to serve this connection
420 '''
Benjamin Petersona9b22222008-08-18 18:01:43 +0000421 threading.current_thread().name = name
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000422 c.send(('#RETURN', None))
423 self.serve_client(c)
424
425 def incref(self, c, ident):
426 self.mutex.acquire()
427 try:
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000428 self.id_to_refcount[ident] += 1
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000429 finally:
430 self.mutex.release()
431
432 def decref(self, c, ident):
433 self.mutex.acquire()
434 try:
435 assert self.id_to_refcount[ident] >= 1
436 self.id_to_refcount[ident] -= 1
437 if self.id_to_refcount[ident] == 0:
438 del self.id_to_obj[ident], self.id_to_refcount[ident]
Georg Brandlc40e60e2009-09-18 09:18:27 +0000439 util.debug('disposing of obj with id %r', ident)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000440 finally:
441 self.mutex.release()
442
443#
444# Class to represent state of a manager
445#
446
447class State(object):
448 __slots__ = ['value']
449 INITIAL = 0
450 STARTED = 1
451 SHUTDOWN = 2
452
453#
454# Mapping from serializer name to Listener and Client types
455#
456
457listener_client = {
458 'pickle' : (connection.Listener, connection.Client),
459 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
460 }
461
462#
463# Definition of BaseManager
464#
465
466class BaseManager(object):
467 '''
468 Base class for managers
469 '''
470 _registry = {}
471 _Server = Server
472
473 def __init__(self, address=None, authkey=None, serializer='pickle'):
474 if authkey is None:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000475 authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000476 self._address = address # XXX not final address if eg ('', 0)
477 self._authkey = AuthenticationString(authkey)
478 self._state = State()
479 self._state.value = State.INITIAL
480 self._serializer = serializer
481 self._Listener, self._Client = listener_client[serializer]
482
483 def __reduce__(self):
484 return type(self).from_address, \
485 (self._address, self._authkey, self._serializer)
486
487 def get_server(self):
488 '''
489 Return server object with serve_forever() method and address attribute
490 '''
491 assert self._state.value == State.INITIAL
492 return Server(self._registry, self._address,
493 self._authkey, self._serializer)
494
495 def connect(self):
496 '''
497 Connect manager object to the server process
498 '''
499 Listener, Client = listener_client[self._serializer]
500 conn = Client(self._address, authkey=self._authkey)
501 dispatch(conn, None, 'dummy')
502 self._state.value = State.STARTED
503
Jesse Noller7152f6d2009-04-02 05:17:26 +0000504 def start(self, initializer=None, initargs=()):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000505 '''
506 Spawn a server process for this manager object
507 '''
508 assert self._state.value == State.INITIAL
509
Jesse Noller7152f6d2009-04-02 05:17:26 +0000510 if initializer is not None and not hasattr(initializer, '__call__'):
511 raise TypeError('initializer must be a callable')
512
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000513 # pipe over which we will retrieve address of server
514 reader, writer = connection.Pipe(duplex=False)
515
516 # spawn process which runs a server
517 self._process = Process(
518 target=type(self)._run_server,
519 args=(self._registry, self._address, self._authkey,
Jesse Noller7152f6d2009-04-02 05:17:26 +0000520 self._serializer, writer, initializer, initargs),
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000521 )
522 ident = ':'.join(str(i) for i in self._process._identity)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000523 self._process.name = type(self).__name__ + '-' + ident
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000524 self._process.start()
525
526 # get address of server
527 writer.close()
528 self._address = reader.recv()
529 reader.close()
530
531 # register a finalizer
532 self._state.value = State.STARTED
533 self.shutdown = util.Finalize(
534 self, type(self)._finalize_manager,
535 args=(self._process, self._address, self._authkey,
536 self._state, self._Client),
537 exitpriority=0
538 )
539
540 @classmethod
Jesse Noller7152f6d2009-04-02 05:17:26 +0000541 def _run_server(cls, registry, address, authkey, serializer, writer,
542 initializer=None, initargs=()):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000543 '''
544 Create a server, report its address and run it
545 '''
Jesse Noller7152f6d2009-04-02 05:17:26 +0000546 if initializer is not None:
547 initializer(*initargs)
548
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000549 # create server
550 server = cls._Server(registry, address, authkey, serializer)
551
552 # inform parent process of the server's address
553 writer.send(server.address)
554 writer.close()
555
556 # run the manager
557 util.info('manager serving at %r', server.address)
558 server.serve_forever()
559
560 def _create(self, typeid, *args, **kwds):
561 '''
562 Create a new shared object; return the token and exposed tuple
563 '''
564 assert self._state.value == State.STARTED, 'server not yet started'
565 conn = self._Client(self._address, authkey=self._authkey)
566 try:
567 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
568 finally:
569 conn.close()
570 return Token(typeid, self._address, id), exposed
571
572 def join(self, timeout=None):
573 '''
574 Join the manager process (if it has been spawned)
575 '''
576 self._process.join(timeout)
577
578 def _debug_info(self):
579 '''
580 Return some info about the servers shared objects and connections
581 '''
582 conn = self._Client(self._address, authkey=self._authkey)
583 try:
584 return dispatch(conn, None, 'debug_info')
585 finally:
586 conn.close()
587
588 def _number_of_objects(self):
589 '''
590 Return the number of shared objects
591 '''
592 conn = self._Client(self._address, authkey=self._authkey)
593 try:
594 return dispatch(conn, None, 'number_of_objects')
595 finally:
596 conn.close()
597
598 def __enter__(self):
599 return self
600
601 def __exit__(self, exc_type, exc_val, exc_tb):
602 self.shutdown()
603
604 @staticmethod
605 def _finalize_manager(process, address, authkey, state, _Client):
606 '''
607 Shutdown the manager process; will be registered as a finalizer
608 '''
609 if process.is_alive():
610 util.info('sending shutdown message to manager')
611 try:
612 conn = _Client(address, authkey=authkey)
613 try:
614 dispatch(conn, None, 'shutdown')
615 finally:
616 conn.close()
617 except Exception:
618 pass
619
620 process.join(timeout=0.2)
621 if process.is_alive():
622 util.info('manager still alive')
623 if hasattr(process, 'terminate'):
624 util.info('trying to `terminate()` manager process')
625 process.terminate()
626 process.join(timeout=0.1)
627 if process.is_alive():
628 util.info('manager still alive after terminate')
629
630 state.value = State.SHUTDOWN
631 try:
632 del BaseProxy._address_to_local[address]
633 except KeyError:
634 pass
635
636 address = property(lambda self: self._address)
637
638 @classmethod
639 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
640 method_to_typeid=None, create_method=True):
641 '''
642 Register a typeid with the manager type
643 '''
644 if '_registry' not in cls.__dict__:
645 cls._registry = cls._registry.copy()
646
647 if proxytype is None:
648 proxytype = AutoProxy
649
650 exposed = exposed or getattr(proxytype, '_exposed_', None)
651
652 method_to_typeid = method_to_typeid or \
653 getattr(proxytype, '_method_to_typeid_', None)
654
655 if method_to_typeid:
656 for key, value in method_to_typeid.items():
657 assert type(key) is str, '%r is not a string' % key
658 assert type(value) is str, '%r is not a string' % value
659
660 cls._registry[typeid] = (
661 callable, exposed, method_to_typeid, proxytype
662 )
663
664 if create_method:
665 def temp(self, *args, **kwds):
666 util.debug('requesting creation of a shared %r object', typeid)
667 token, exp = self._create(typeid, *args, **kwds)
668 proxy = proxytype(
669 token, self._serializer, manager=self,
670 authkey=self._authkey, exposed=exp
671 )
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000672 conn = self._Client(token.address, authkey=self._authkey)
673 dispatch(conn, None, 'decref', (token.id,))
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000674 return proxy
675 temp.__name__ = typeid
676 setattr(cls, typeid, temp)
677
678#
679# Subclass of set which get cleared after a fork
680#
681
682class ProcessLocalSet(set):
683 def __init__(self):
684 util.register_after_fork(self, lambda obj: obj.clear())
685 def __reduce__(self):
686 return type(self), ()
687
688#
689# Definition of BaseProxy
690#
691
692class BaseProxy(object):
693 '''
694 A base for proxies of shared objects
695 '''
696 _address_to_local = {}
697 _mutex = util.ForkAwareThreadLock()
698
699 def __init__(self, token, serializer, manager=None,
700 authkey=None, exposed=None, incref=True):
701 BaseProxy._mutex.acquire()
702 try:
703 tls_idset = BaseProxy._address_to_local.get(token.address, None)
704 if tls_idset is None:
705 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
706 BaseProxy._address_to_local[token.address] = tls_idset
707 finally:
708 BaseProxy._mutex.release()
709
710 # self._tls is used to record the connection used by this
711 # thread to communicate with the manager at token.address
712 self._tls = tls_idset[0]
713
714 # self._idset is used to record the identities of all shared
715 # objects for which the current process owns references and
716 # which are in the manager at token.address
717 self._idset = tls_idset[1]
718
719 self._token = token
720 self._id = self._token.id
721 self._manager = manager
722 self._serializer = serializer
723 self._Client = listener_client[serializer][1]
724
725 if authkey is not None:
726 self._authkey = AuthenticationString(authkey)
727 elif self._manager is not None:
728 self._authkey = self._manager._authkey
729 else:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000730 self._authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000731
732 if incref:
733 self._incref()
734
735 util.register_after_fork(self, BaseProxy._after_fork)
736
737 def _connect(self):
738 util.debug('making connection to manager')
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000739 name = current_process().name
Benjamin Petersona9b22222008-08-18 18:01:43 +0000740 if threading.current_thread().name != 'MainThread':
741 name += '|' + threading.current_thread().name
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000742 conn = self._Client(self._token.address, authkey=self._authkey)
743 dispatch(conn, None, 'accept_connection', (name,))
744 self._tls.connection = conn
745
746 def _callmethod(self, methodname, args=(), kwds={}):
747 '''
748 Try to call a method of the referrent and return a copy of the result
749 '''
750 try:
751 conn = self._tls.connection
752 except AttributeError:
753 util.debug('thread %r does not own a connection',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000754 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000755 self._connect()
756 conn = self._tls.connection
757
758 conn.send((self._id, methodname, args, kwds))
759 kind, result = conn.recv()
760
761 if kind == '#RETURN':
762 return result
763 elif kind == '#PROXY':
764 exposed, token = result
765 proxytype = self._manager._registry[token.typeid][-1]
Richard Oudkerk1e462fe2013-07-02 13:31:43 +0100766 token.address = self._token.address
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000767 proxy = proxytype(
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000768 token, self._serializer, manager=self._manager,
769 authkey=self._authkey, exposed=exposed
770 )
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000771 conn = self._Client(token.address, authkey=self._authkey)
772 dispatch(conn, None, 'decref', (token.id,))
773 return proxy
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000774 raise convert_to_error(kind, result)
775
776 def _getvalue(self):
777 '''
778 Get a copy of the value of the referent
779 '''
780 return self._callmethod('#GETVALUE')
781
782 def _incref(self):
783 conn = self._Client(self._token.address, authkey=self._authkey)
784 dispatch(conn, None, 'incref', (self._id,))
785 util.debug('INCREF %r', self._token.id)
786
787 self._idset.add(self._id)
788
789 state = self._manager and self._manager._state
790
791 self._close = util.Finalize(
792 self, BaseProxy._decref,
793 args=(self._token, self._authkey, state,
794 self._tls, self._idset, self._Client),
795 exitpriority=10
796 )
797
798 @staticmethod
799 def _decref(token, authkey, state, tls, idset, _Client):
800 idset.discard(token.id)
801
802 # check whether manager is still alive
803 if state is None or state.value == State.STARTED:
804 # tell manager this process no longer cares about referent
805 try:
806 util.debug('DECREF %r', token.id)
807 conn = _Client(token.address, authkey=authkey)
808 dispatch(conn, None, 'decref', (token.id,))
809 except Exception, e:
810 util.debug('... decref failed %s', e)
811
812 else:
813 util.debug('DECREF %r -- manager already shutdown', token.id)
814
815 # check whether we can close this thread's connection because
816 # the process owns no more references to objects for this manager
817 if not idset and hasattr(tls, 'connection'):
818 util.debug('thread %r has no more proxies so closing conn',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000819 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000820 tls.connection.close()
821 del tls.connection
822
823 def _after_fork(self):
824 self._manager = None
825 try:
826 self._incref()
827 except Exception, e:
828 # the proxy may just be for a manager which has shutdown
829 util.info('incref failed: %s' % e)
830
831 def __reduce__(self):
832 kwds = {}
833 if Popen.thread_is_spawning():
834 kwds['authkey'] = self._authkey
835
836 if getattr(self, '_isauto', False):
837 kwds['exposed'] = self._exposed_
838 return (RebuildProxy,
839 (AutoProxy, self._token, self._serializer, kwds))
840 else:
841 return (RebuildProxy,
842 (type(self), self._token, self._serializer, kwds))
843
844 def __deepcopy__(self, memo):
845 return self._getvalue()
846
847 def __repr__(self):
848 return '<%s object, typeid %r at %s>' % \
849 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
850
851 def __str__(self):
852 '''
853 Return representation of the referent (or a fall-back if that fails)
854 '''
855 try:
856 return self._callmethod('__repr__')
857 except Exception:
858 return repr(self)[:-1] + "; '__str__()' failed>"
859
860#
861# Function used for unpickling
862#
863
864def RebuildProxy(func, token, serializer, kwds):
865 '''
866 Function used for unpickling proxy objects.
867
868 If possible the shared object is returned, or otherwise a proxy for it.
869 '''
870 server = getattr(current_process(), '_manager_server', None)
871
872 if server and server.address == token.address:
873 return server.id_to_obj[token.id][0]
874 else:
875 incref = (
876 kwds.pop('incref', True) and
877 not getattr(current_process(), '_inheriting', False)
878 )
879 return func(token, serializer, incref=incref, **kwds)
880
881#
882# Functions to create proxies and proxy types
883#
884
885def MakeProxyType(name, exposed, _cache={}):
886 '''
887 Return an proxy type whose methods are given by `exposed`
888 '''
889 exposed = tuple(exposed)
890 try:
891 return _cache[(name, exposed)]
892 except KeyError:
893 pass
894
895 dic = {}
896
897 for meth in exposed:
898 exec '''def %s(self, *args, **kwds):
899 return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
900
901 ProxyType = type(name, (BaseProxy,), dic)
902 ProxyType._exposed_ = exposed
903 _cache[(name, exposed)] = ProxyType
904 return ProxyType
905
906
907def AutoProxy(token, serializer, manager=None, authkey=None,
908 exposed=None, incref=True):
909 '''
910 Return an auto-proxy for `token`
911 '''
912 _Client = listener_client[serializer][1]
913
914 if exposed is None:
915 conn = _Client(token.address, authkey=authkey)
916 try:
917 exposed = dispatch(conn, None, 'get_methods', (token,))
918 finally:
919 conn.close()
920
921 if authkey is None and manager is not None:
922 authkey = manager._authkey
923 if authkey is None:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000924 authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000925
926 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
927 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
928 incref=incref)
929 proxy._isauto = True
930 return proxy
931
932#
933# Types/callables which we will register with SyncManager
934#
935
936class Namespace(object):
937 def __init__(self, **kwds):
938 self.__dict__.update(kwds)
939 def __repr__(self):
940 items = self.__dict__.items()
941 temp = []
942 for name, value in items:
943 if not name.startswith('_'):
944 temp.append('%s=%r' % (name, value))
945 temp.sort()
946 return 'Namespace(%s)' % str.join(', ', temp)
947
948class Value(object):
949 def __init__(self, typecode, value, lock=True):
950 self._typecode = typecode
951 self._value = value
952 def get(self):
953 return self._value
954 def set(self, value):
955 self._value = value
956 def __repr__(self):
957 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
958 value = property(get, set)
959
960def Array(typecode, sequence, lock=True):
961 return array.array(typecode, sequence)
962
963#
964# Proxy types used by SyncManager
965#
966
967class IteratorProxy(BaseProxy):
968 # XXX remove methods for Py3.0 and Py2.6
969 _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
970 def __iter__(self):
971 return self
972 def __next__(self, *args):
973 return self._callmethod('__next__', args)
974 def next(self, *args):
975 return self._callmethod('next', args)
976 def send(self, *args):
977 return self._callmethod('send', args)
978 def throw(self, *args):
979 return self._callmethod('throw', args)
980 def close(self, *args):
981 return self._callmethod('close', args)
982
983
984class AcquirerProxy(BaseProxy):
985 _exposed_ = ('acquire', 'release')
986 def acquire(self, blocking=True):
987 return self._callmethod('acquire', (blocking,))
988 def release(self):
989 return self._callmethod('release')
990 def __enter__(self):
991 return self._callmethod('acquire')
992 def __exit__(self, exc_type, exc_val, exc_tb):
993 return self._callmethod('release')
994
995
996class ConditionProxy(AcquirerProxy):
997 # XXX will Condition.notfyAll() name be available in Py3.0?
998 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
999 def wait(self, timeout=None):
1000 return self._callmethod('wait', (timeout,))
1001 def notify(self):
1002 return self._callmethod('notify')
1003 def notify_all(self):
1004 return self._callmethod('notify_all')
1005
1006class EventProxy(BaseProxy):
Benjamin Peterson80821f72008-06-26 21:29:19 +00001007 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001008 def is_set(self):
Benjamin Peterson0adfd932008-06-26 21:24:35 +00001009 return self._callmethod('is_set')
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001010 def set(self):
1011 return self._callmethod('set')
1012 def clear(self):
1013 return self._callmethod('clear')
1014 def wait(self, timeout=None):
1015 return self._callmethod('wait', (timeout,))
1016
1017class NamespaceProxy(BaseProxy):
1018 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1019 def __getattr__(self, key):
1020 if key[0] == '_':
1021 return object.__getattribute__(self, key)
1022 callmethod = object.__getattribute__(self, '_callmethod')
1023 return callmethod('__getattribute__', (key,))
1024 def __setattr__(self, key, value):
1025 if key[0] == '_':
1026 return object.__setattr__(self, key, value)
1027 callmethod = object.__getattribute__(self, '_callmethod')
1028 return callmethod('__setattr__', (key, value))
1029 def __delattr__(self, key):
1030 if key[0] == '_':
1031 return object.__delattr__(self, key)
1032 callmethod = object.__getattribute__(self, '_callmethod')
1033 return callmethod('__delattr__', (key,))
1034
1035
1036class ValueProxy(BaseProxy):
1037 _exposed_ = ('get', 'set')
1038 def get(self):
1039 return self._callmethod('get')
1040 def set(self, value):
1041 return self._callmethod('set', (value,))
1042 value = property(get, set)
1043
1044
1045BaseListProxy = MakeProxyType('BaseListProxy', (
1046 '__add__', '__contains__', '__delitem__', '__delslice__',
1047 '__getitem__', '__getslice__', '__len__', '__mul__',
1048 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1049 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1050 'reverse', 'sort', '__imul__'
1051 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1052class ListProxy(BaseListProxy):
1053 def __iadd__(self, value):
1054 self._callmethod('extend', (value,))
1055 return self
1056 def __imul__(self, value):
1057 self._callmethod('__imul__', (value,))
1058 return self
1059
1060
1061DictProxy = MakeProxyType('DictProxy', (
1062 '__contains__', '__delitem__', '__getitem__', '__len__',
1063 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1064 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1065 ))
1066
1067
1068ArrayProxy = MakeProxyType('ArrayProxy', (
1069 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1070 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1071
1072
1073PoolProxy = MakeProxyType('PoolProxy', (
1074 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1075 'map', 'map_async', 'terminate'
1076 ))
1077PoolProxy._method_to_typeid_ = {
1078 'apply_async': 'AsyncResult',
1079 'map_async': 'AsyncResult',
1080 'imap': 'Iterator',
1081 'imap_unordered': 'Iterator'
1082 }
1083
1084#
1085# Definition of SyncManager
1086#
1087
1088class SyncManager(BaseManager):
1089 '''
1090 Subclass of `BaseManager` which supports a number of shared object types.
1091
1092 The types registered are those intended for the synchronization
1093 of threads, plus `dict`, `list` and `Namespace`.
1094
1095 The `multiprocessing.Manager()` function creates started instances of
1096 this class.
1097 '''
1098
1099SyncManager.register('Queue', Queue.Queue)
1100SyncManager.register('JoinableQueue', Queue.Queue)
1101SyncManager.register('Event', threading.Event, EventProxy)
1102SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1103SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1104SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1105SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1106 AcquirerProxy)
1107SyncManager.register('Condition', threading.Condition, ConditionProxy)
1108SyncManager.register('Pool', Pool, PoolProxy)
1109SyncManager.register('list', list, ListProxy)
1110SyncManager.register('dict', dict, DictProxy)
1111SyncManager.register('Value', Value, ValueProxy)
1112SyncManager.register('Array', Array, ArrayProxy)
1113SyncManager.register('Namespace', Namespace, NamespaceProxy)
1114
1115# types returned by methods of PoolProxy
1116SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1117SyncManager.register('AsyncResult', create_method=False)