blob: 78518fd522c7518ade3c23226b6e87d7a4a5d3d9 [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray79af2452010-12-14 01:42:40 +00007# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15# notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17# notice, this list of conditions and the following disclaimer in the
18# documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20# used to endorse or promote products derived from this software
21# without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000034#
35
36__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
37
38#
39# Imports
40#
41
42import os
43import sys
44import weakref
45import threading
46import array
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000047import Queue
48
49from traceback import format_exc
50from multiprocessing import Process, current_process, active_children, Pool, util, connection
51from multiprocessing.process import AuthenticationString
Jesse Noller13e9d582008-07-16 14:32:36 +000052from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000053from multiprocessing.util import Finalize, info
54
55try:
56 from cPickle import PicklingError
57except ImportError:
58 from pickle import PicklingError
59
60#
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000061# Register some things for pickling
62#
63
64def reduce_array(a):
65 return array.array, (a.typecode, a.tostring())
Jesse Noller13e9d582008-07-16 14:32:36 +000066ForkingPickler.register(array.array, reduce_array)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000067
68view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000069
70#
71# Type for identifying shared objects
72#
73
74class Token(object):
75 '''
76 Type to uniquely indentify a shared object
77 '''
78 __slots__ = ('typeid', 'address', 'id')
79
80 def __init__(self, typeid, address, id):
81 (self.typeid, self.address, self.id) = (typeid, address, id)
82
83 def __getstate__(self):
84 return (self.typeid, self.address, self.id)
85
86 def __setstate__(self, state):
87 (self.typeid, self.address, self.id) = state
88
89 def __repr__(self):
90 return 'Token(typeid=%r, address=%r, id=%r)' % \
91 (self.typeid, self.address, self.id)
92
93#
94# Function for communication with a manager's server process
95#
96
97def dispatch(c, id, methodname, args=(), kwds={}):
98 '''
99 Send a message to manager using connection `c` and return response
100 '''
101 c.send((id, methodname, args, kwds))
102 kind, result = c.recv()
103 if kind == '#RETURN':
104 return result
105 raise convert_to_error(kind, result)
106
107def convert_to_error(kind, result):
108 if kind == '#ERROR':
109 return result
110 elif kind == '#TRACEBACK':
111 assert type(result) is str
112 return RemoteError(result)
113 elif kind == '#UNSERIALIZABLE':
114 assert type(result) is str
115 return RemoteError('Unserializable message: %s\n' % result)
116 else:
117 return ValueError('Unrecognized message type')
118
119class RemoteError(Exception):
120 def __str__(self):
121 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
122
123#
124# Functions for finding the method names of an object
125#
126
127def all_methods(obj):
128 '''
129 Return a list of names of methods of `obj`
130 '''
131 temp = []
132 for name in dir(obj):
133 func = getattr(obj, name)
134 if hasattr(func, '__call__'):
135 temp.append(name)
136 return temp
137
138def public_methods(obj):
139 '''
140 Return a list of names of methods of `obj` which do not start with '_'
141 '''
142 return [name for name in all_methods(obj) if name[0] != '_']
143
144#
145# Server which is run in a process controlled by a manager
146#
147
148class Server(object):
149 '''
150 Server class which runs in a process controlled by a manager object
151 '''
152 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
153 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
154
155 def __init__(self, registry, address, authkey, serializer):
156 assert isinstance(authkey, bytes)
157 self.registry = registry
158 self.authkey = AuthenticationString(authkey)
159 Listener, Client = listener_client[serializer]
160
161 # do authentication later
162 self.listener = Listener(address=address, backlog=5)
163 self.address = self.listener.address
164
Jesse Noller7314b382009-01-21 02:08:17 +0000165 self.id_to_obj = {'0': (None, ())}
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000166 self.id_to_refcount = {}
167 self.mutex = threading.RLock()
168 self.stop = 0
169
170 def serve_forever(self):
171 '''
172 Run the server forever
173 '''
174 current_process()._manager_server = self
175 try:
176 try:
177 while 1:
178 try:
179 c = self.listener.accept()
180 except (OSError, IOError):
181 continue
182 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Peterson82aa2012008-08-18 18:31:58 +0000183 t.daemon = True
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000184 t.start()
185 except (KeyboardInterrupt, SystemExit):
186 pass
187 finally:
188 self.stop = 999
189 self.listener.close()
190
191 def handle_request(self, c):
192 '''
193 Handle a new connection
194 '''
195 funcname = result = request = None
196 try:
197 connection.deliver_challenge(c, self.authkey)
198 connection.answer_challenge(c, self.authkey)
199 request = c.recv()
200 ignore, funcname, args, kwds = request
201 assert funcname in self.public, '%r unrecognized' % funcname
202 func = getattr(self, funcname)
203 except Exception:
204 msg = ('#TRACEBACK', format_exc())
205 else:
206 try:
207 result = func(c, *args, **kwds)
208 except Exception:
209 msg = ('#TRACEBACK', format_exc())
210 else:
211 msg = ('#RETURN', result)
212 try:
213 c.send(msg)
214 except Exception, e:
215 try:
216 c.send(('#TRACEBACK', format_exc()))
217 except Exception:
218 pass
219 util.info('Failure to send message: %r', msg)
220 util.info(' ... request was %r', request)
221 util.info(' ... exception was %r', e)
222
223 c.close()
224
225 def serve_client(self, conn):
226 '''
227 Handle requests from the proxies in a particular process/thread
228 '''
229 util.debug('starting server thread to service %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000230 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000231
232 recv = conn.recv
233 send = conn.send
234 id_to_obj = self.id_to_obj
235
236 while not self.stop:
237
238 try:
239 methodname = obj = None
240 request = recv()
241 ident, methodname, args, kwds = request
242 obj, exposed, gettypeid = id_to_obj[ident]
243
244 if methodname not in exposed:
245 raise AttributeError(
246 'method %r of %r object is not in exposed=%r' %
247 (methodname, type(obj), exposed)
248 )
249
250 function = getattr(obj, methodname)
251
252 try:
253 res = function(*args, **kwds)
254 except Exception, e:
255 msg = ('#ERROR', e)
256 else:
257 typeid = gettypeid and gettypeid.get(methodname, None)
258 if typeid:
259 rident, rexposed = self.create(conn, typeid, res)
260 token = Token(typeid, self.address, rident)
261 msg = ('#PROXY', (rexposed, token))
262 else:
263 msg = ('#RETURN', res)
264
265 except AttributeError:
266 if methodname is None:
267 msg = ('#TRACEBACK', format_exc())
268 else:
269 try:
270 fallback_func = self.fallback_mapping[methodname]
271 result = fallback_func(
272 self, conn, ident, obj, *args, **kwds
273 )
274 msg = ('#RETURN', result)
275 except Exception:
276 msg = ('#TRACEBACK', format_exc())
277
278 except EOFError:
279 util.debug('got EOF -- exiting thread serving %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000280 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000281 sys.exit(0)
282
283 except Exception:
284 msg = ('#TRACEBACK', format_exc())
285
286 try:
287 try:
288 send(msg)
289 except Exception, e:
290 send(('#UNSERIALIZABLE', repr(msg)))
291 except Exception, e:
292 util.info('exception in thread serving %r',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000293 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000294 util.info(' ... message was %r', msg)
295 util.info(' ... exception was %r', e)
296 conn.close()
297 sys.exit(1)
298
299 def fallback_getvalue(self, conn, ident, obj):
300 return obj
301
302 def fallback_str(self, conn, ident, obj):
303 return str(obj)
304
305 def fallback_repr(self, conn, ident, obj):
306 return repr(obj)
307
308 fallback_mapping = {
309 '__str__':fallback_str,
310 '__repr__':fallback_repr,
311 '#GETVALUE':fallback_getvalue
312 }
313
314 def dummy(self, c):
315 pass
316
317 def debug_info(self, c):
318 '''
319 Return some info --- useful to spot problems with refcounting
320 '''
321 self.mutex.acquire()
322 try:
323 result = []
324 keys = self.id_to_obj.keys()
325 keys.sort()
326 for ident in keys:
Jesse Noller7314b382009-01-21 02:08:17 +0000327 if ident != '0':
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000328 result.append(' %s: refcount=%s\n %s' %
329 (ident, self.id_to_refcount[ident],
330 str(self.id_to_obj[ident][0])[:75]))
331 return '\n'.join(result)
332 finally:
333 self.mutex.release()
334
335 def number_of_objects(self, c):
336 '''
337 Number of shared objects
338 '''
Jesse Noller7314b382009-01-21 02:08:17 +0000339 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000340
341 def shutdown(self, c):
342 '''
343 Shutdown this process
344 '''
345 try:
346 try:
347 util.debug('manager received shutdown message')
348 c.send(('#RETURN', None))
349
350 if sys.stdout != sys.__stdout__:
351 util.debug('resetting stdout, stderr')
352 sys.stdout = sys.__stdout__
353 sys.stderr = sys.__stderr__
354
355 util._run_finalizers(0)
356
357 for p in active_children():
358 util.debug('terminating a child process of manager')
359 p.terminate()
360
361 for p in active_children():
362 util.debug('terminating a child process of manager')
363 p.join()
364
365 util._run_finalizers()
366 util.info('manager exiting with exitcode 0')
367 except:
368 import traceback
369 traceback.print_exc()
370 finally:
371 exit(0)
372
373 def create(self, c, typeid, *args, **kwds):
374 '''
375 Create a new shared object and return its id
376 '''
377 self.mutex.acquire()
378 try:
379 callable, exposed, method_to_typeid, proxytype = \
380 self.registry[typeid]
381
382 if callable is None:
383 assert len(args) == 1 and not kwds
384 obj = args[0]
385 else:
386 obj = callable(*args, **kwds)
387
388 if exposed is None:
389 exposed = public_methods(obj)
390 if method_to_typeid is not None:
391 assert type(method_to_typeid) is dict
392 exposed = list(exposed) + list(method_to_typeid)
393
394 ident = '%x' % id(obj) # convert to string because xmlrpclib
395 # only has 32 bit signed integers
396 util.debug('%r callable returned object with id %r', typeid, ident)
397
398 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
399 if ident not in self.id_to_refcount:
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000400 self.id_to_refcount[ident] = 0
401 # increment the reference count immediately, to avoid
402 # this object being garbage collected before a Proxy
403 # object for it can be created. The caller of create()
404 # is responsible for doing a decref once the Proxy object
405 # has been created.
406 self.incref(c, ident)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000407 return ident, tuple(exposed)
408 finally:
409 self.mutex.release()
410
411 def get_methods(self, c, token):
412 '''
413 Return the methods of the shared object indicated by token
414 '''
415 return tuple(self.id_to_obj[token.id][1])
416
417 def accept_connection(self, c, name):
418 '''
419 Spawn a new thread to serve this connection
420 '''
Benjamin Petersona9b22222008-08-18 18:01:43 +0000421 threading.current_thread().name = name
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000422 c.send(('#RETURN', None))
423 self.serve_client(c)
424
425 def incref(self, c, ident):
426 self.mutex.acquire()
427 try:
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000428 self.id_to_refcount[ident] += 1
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000429 finally:
430 self.mutex.release()
431
432 def decref(self, c, ident):
433 self.mutex.acquire()
434 try:
435 assert self.id_to_refcount[ident] >= 1
436 self.id_to_refcount[ident] -= 1
437 if self.id_to_refcount[ident] == 0:
438 del self.id_to_obj[ident], self.id_to_refcount[ident]
Georg Brandlc40e60e2009-09-18 09:18:27 +0000439 util.debug('disposing of obj with id %r', ident)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000440 finally:
441 self.mutex.release()
442
443#
444# Class to represent state of a manager
445#
446
447class State(object):
448 __slots__ = ['value']
449 INITIAL = 0
450 STARTED = 1
451 SHUTDOWN = 2
452
453#
454# Mapping from serializer name to Listener and Client types
455#
456
457listener_client = {
458 'pickle' : (connection.Listener, connection.Client),
459 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
460 }
461
462#
463# Definition of BaseManager
464#
465
466class BaseManager(object):
467 '''
468 Base class for managers
469 '''
470 _registry = {}
471 _Server = Server
472
473 def __init__(self, address=None, authkey=None, serializer='pickle'):
474 if authkey is None:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000475 authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000476 self._address = address # XXX not final address if eg ('', 0)
477 self._authkey = AuthenticationString(authkey)
478 self._state = State()
479 self._state.value = State.INITIAL
480 self._serializer = serializer
481 self._Listener, self._Client = listener_client[serializer]
482
483 def __reduce__(self):
484 return type(self).from_address, \
485 (self._address, self._authkey, self._serializer)
486
487 def get_server(self):
488 '''
489 Return server object with serve_forever() method and address attribute
490 '''
491 assert self._state.value == State.INITIAL
492 return Server(self._registry, self._address,
493 self._authkey, self._serializer)
494
495 def connect(self):
496 '''
497 Connect manager object to the server process
498 '''
499 Listener, Client = listener_client[self._serializer]
500 conn = Client(self._address, authkey=self._authkey)
501 dispatch(conn, None, 'dummy')
502 self._state.value = State.STARTED
503
Jesse Noller7152f6d2009-04-02 05:17:26 +0000504 def start(self, initializer=None, initargs=()):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000505 '''
506 Spawn a server process for this manager object
507 '''
508 assert self._state.value == State.INITIAL
509
Jesse Noller7152f6d2009-04-02 05:17:26 +0000510 if initializer is not None and not hasattr(initializer, '__call__'):
511 raise TypeError('initializer must be a callable')
512
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000513 # pipe over which we will retrieve address of server
514 reader, writer = connection.Pipe(duplex=False)
515
516 # spawn process which runs a server
517 self._process = Process(
518 target=type(self)._run_server,
519 args=(self._registry, self._address, self._authkey,
Jesse Noller7152f6d2009-04-02 05:17:26 +0000520 self._serializer, writer, initializer, initargs),
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000521 )
522 ident = ':'.join(str(i) for i in self._process._identity)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000523 self._process.name = type(self).__name__ + '-' + ident
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000524 self._process.start()
525
526 # get address of server
527 writer.close()
528 self._address = reader.recv()
529 reader.close()
530
531 # register a finalizer
532 self._state.value = State.STARTED
533 self.shutdown = util.Finalize(
534 self, type(self)._finalize_manager,
535 args=(self._process, self._address, self._authkey,
536 self._state, self._Client),
537 exitpriority=0
538 )
539
540 @classmethod
Jesse Noller7152f6d2009-04-02 05:17:26 +0000541 def _run_server(cls, registry, address, authkey, serializer, writer,
542 initializer=None, initargs=()):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000543 '''
544 Create a server, report its address and run it
545 '''
Jesse Noller7152f6d2009-04-02 05:17:26 +0000546 if initializer is not None:
547 initializer(*initargs)
548
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000549 # create server
550 server = cls._Server(registry, address, authkey, serializer)
551
552 # inform parent process of the server's address
553 writer.send(server.address)
554 writer.close()
555
556 # run the manager
557 util.info('manager serving at %r', server.address)
558 server.serve_forever()
559
560 def _create(self, typeid, *args, **kwds):
561 '''
562 Create a new shared object; return the token and exposed tuple
563 '''
564 assert self._state.value == State.STARTED, 'server not yet started'
565 conn = self._Client(self._address, authkey=self._authkey)
566 try:
567 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
568 finally:
569 conn.close()
570 return Token(typeid, self._address, id), exposed
571
572 def join(self, timeout=None):
573 '''
574 Join the manager process (if it has been spawned)
575 '''
576 self._process.join(timeout)
577
578 def _debug_info(self):
579 '''
580 Return some info about the servers shared objects and connections
581 '''
582 conn = self._Client(self._address, authkey=self._authkey)
583 try:
584 return dispatch(conn, None, 'debug_info')
585 finally:
586 conn.close()
587
588 def _number_of_objects(self):
589 '''
590 Return the number of shared objects
591 '''
592 conn = self._Client(self._address, authkey=self._authkey)
593 try:
594 return dispatch(conn, None, 'number_of_objects')
595 finally:
596 conn.close()
597
598 def __enter__(self):
599 return self
600
601 def __exit__(self, exc_type, exc_val, exc_tb):
602 self.shutdown()
603
604 @staticmethod
605 def _finalize_manager(process, address, authkey, state, _Client):
606 '''
607 Shutdown the manager process; will be registered as a finalizer
608 '''
609 if process.is_alive():
610 util.info('sending shutdown message to manager')
611 try:
612 conn = _Client(address, authkey=authkey)
613 try:
614 dispatch(conn, None, 'shutdown')
615 finally:
616 conn.close()
617 except Exception:
618 pass
619
620 process.join(timeout=0.2)
621 if process.is_alive():
622 util.info('manager still alive')
623 if hasattr(process, 'terminate'):
624 util.info('trying to `terminate()` manager process')
625 process.terminate()
626 process.join(timeout=0.1)
627 if process.is_alive():
628 util.info('manager still alive after terminate')
629
630 state.value = State.SHUTDOWN
631 try:
632 del BaseProxy._address_to_local[address]
633 except KeyError:
634 pass
635
636 address = property(lambda self: self._address)
637
638 @classmethod
639 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
640 method_to_typeid=None, create_method=True):
641 '''
642 Register a typeid with the manager type
643 '''
644 if '_registry' not in cls.__dict__:
645 cls._registry = cls._registry.copy()
646
647 if proxytype is None:
648 proxytype = AutoProxy
649
650 exposed = exposed or getattr(proxytype, '_exposed_', None)
651
652 method_to_typeid = method_to_typeid or \
653 getattr(proxytype, '_method_to_typeid_', None)
654
655 if method_to_typeid:
656 for key, value in method_to_typeid.items():
657 assert type(key) is str, '%r is not a string' % key
658 assert type(value) is str, '%r is not a string' % value
659
660 cls._registry[typeid] = (
661 callable, exposed, method_to_typeid, proxytype
662 )
663
664 if create_method:
665 def temp(self, *args, **kwds):
666 util.debug('requesting creation of a shared %r object', typeid)
667 token, exp = self._create(typeid, *args, **kwds)
668 proxy = proxytype(
669 token, self._serializer, manager=self,
670 authkey=self._authkey, exposed=exp
671 )
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000672 conn = self._Client(token.address, authkey=self._authkey)
673 dispatch(conn, None, 'decref', (token.id,))
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000674 return proxy
675 temp.__name__ = typeid
676 setattr(cls, typeid, temp)
677
678#
679# Subclass of set which get cleared after a fork
680#
681
682class ProcessLocalSet(set):
683 def __init__(self):
684 util.register_after_fork(self, lambda obj: obj.clear())
685 def __reduce__(self):
686 return type(self), ()
687
688#
689# Definition of BaseProxy
690#
691
692class BaseProxy(object):
693 '''
694 A base for proxies of shared objects
695 '''
696 _address_to_local = {}
697 _mutex = util.ForkAwareThreadLock()
698
699 def __init__(self, token, serializer, manager=None,
700 authkey=None, exposed=None, incref=True):
701 BaseProxy._mutex.acquire()
702 try:
703 tls_idset = BaseProxy._address_to_local.get(token.address, None)
704 if tls_idset is None:
705 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
706 BaseProxy._address_to_local[token.address] = tls_idset
707 finally:
708 BaseProxy._mutex.release()
709
710 # self._tls is used to record the connection used by this
711 # thread to communicate with the manager at token.address
712 self._tls = tls_idset[0]
713
714 # self._idset is used to record the identities of all shared
715 # objects for which the current process owns references and
716 # which are in the manager at token.address
717 self._idset = tls_idset[1]
718
719 self._token = token
720 self._id = self._token.id
721 self._manager = manager
722 self._serializer = serializer
723 self._Client = listener_client[serializer][1]
724
725 if authkey is not None:
726 self._authkey = AuthenticationString(authkey)
727 elif self._manager is not None:
728 self._authkey = self._manager._authkey
729 else:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000730 self._authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000731
732 if incref:
733 self._incref()
734
735 util.register_after_fork(self, BaseProxy._after_fork)
736
737 def _connect(self):
738 util.debug('making connection to manager')
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000739 name = current_process().name
Benjamin Petersona9b22222008-08-18 18:01:43 +0000740 if threading.current_thread().name != 'MainThread':
741 name += '|' + threading.current_thread().name
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000742 conn = self._Client(self._token.address, authkey=self._authkey)
743 dispatch(conn, None, 'accept_connection', (name,))
744 self._tls.connection = conn
745
746 def _callmethod(self, methodname, args=(), kwds={}):
747 '''
748 Try to call a method of the referrent and return a copy of the result
749 '''
750 try:
751 conn = self._tls.connection
752 except AttributeError:
753 util.debug('thread %r does not own a connection',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000754 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000755 self._connect()
756 conn = self._tls.connection
757
758 conn.send((self._id, methodname, args, kwds))
759 kind, result = conn.recv()
760
761 if kind == '#RETURN':
762 return result
763 elif kind == '#PROXY':
764 exposed, token = result
765 proxytype = self._manager._registry[token.typeid][-1]
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000766 proxy = proxytype(
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000767 token, self._serializer, manager=self._manager,
768 authkey=self._authkey, exposed=exposed
769 )
Benjamin Petersonf7feaec2008-09-01 17:10:46 +0000770 conn = self._Client(token.address, authkey=self._authkey)
771 dispatch(conn, None, 'decref', (token.id,))
772 return proxy
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000773 raise convert_to_error(kind, result)
774
775 def _getvalue(self):
776 '''
777 Get a copy of the value of the referent
778 '''
779 return self._callmethod('#GETVALUE')
780
781 def _incref(self):
782 conn = self._Client(self._token.address, authkey=self._authkey)
783 dispatch(conn, None, 'incref', (self._id,))
784 util.debug('INCREF %r', self._token.id)
785
786 self._idset.add(self._id)
787
788 state = self._manager and self._manager._state
789
790 self._close = util.Finalize(
791 self, BaseProxy._decref,
792 args=(self._token, self._authkey, state,
793 self._tls, self._idset, self._Client),
794 exitpriority=10
795 )
796
797 @staticmethod
798 def _decref(token, authkey, state, tls, idset, _Client):
799 idset.discard(token.id)
800
801 # check whether manager is still alive
802 if state is None or state.value == State.STARTED:
803 # tell manager this process no longer cares about referent
804 try:
805 util.debug('DECREF %r', token.id)
806 conn = _Client(token.address, authkey=authkey)
807 dispatch(conn, None, 'decref', (token.id,))
808 except Exception, e:
809 util.debug('... decref failed %s', e)
810
811 else:
812 util.debug('DECREF %r -- manager already shutdown', token.id)
813
814 # check whether we can close this thread's connection because
815 # the process owns no more references to objects for this manager
816 if not idset and hasattr(tls, 'connection'):
817 util.debug('thread %r has no more proxies so closing conn',
Benjamin Petersona9b22222008-08-18 18:01:43 +0000818 threading.current_thread().name)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000819 tls.connection.close()
820 del tls.connection
821
822 def _after_fork(self):
823 self._manager = None
824 try:
825 self._incref()
826 except Exception, e:
827 # the proxy may just be for a manager which has shutdown
828 util.info('incref failed: %s' % e)
829
830 def __reduce__(self):
831 kwds = {}
832 if Popen.thread_is_spawning():
833 kwds['authkey'] = self._authkey
834
835 if getattr(self, '_isauto', False):
836 kwds['exposed'] = self._exposed_
837 return (RebuildProxy,
838 (AutoProxy, self._token, self._serializer, kwds))
839 else:
840 return (RebuildProxy,
841 (type(self), self._token, self._serializer, kwds))
842
843 def __deepcopy__(self, memo):
844 return self._getvalue()
845
846 def __repr__(self):
847 return '<%s object, typeid %r at %s>' % \
848 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
849
850 def __str__(self):
851 '''
852 Return representation of the referent (or a fall-back if that fails)
853 '''
854 try:
855 return self._callmethod('__repr__')
856 except Exception:
857 return repr(self)[:-1] + "; '__str__()' failed>"
858
859#
860# Function used for unpickling
861#
862
863def RebuildProxy(func, token, serializer, kwds):
864 '''
865 Function used for unpickling proxy objects.
866
867 If possible the shared object is returned, or otherwise a proxy for it.
868 '''
869 server = getattr(current_process(), '_manager_server', None)
870
871 if server and server.address == token.address:
872 return server.id_to_obj[token.id][0]
873 else:
874 incref = (
875 kwds.pop('incref', True) and
876 not getattr(current_process(), '_inheriting', False)
877 )
878 return func(token, serializer, incref=incref, **kwds)
879
880#
881# Functions to create proxies and proxy types
882#
883
884def MakeProxyType(name, exposed, _cache={}):
885 '''
886 Return an proxy type whose methods are given by `exposed`
887 '''
888 exposed = tuple(exposed)
889 try:
890 return _cache[(name, exposed)]
891 except KeyError:
892 pass
893
894 dic = {}
895
896 for meth in exposed:
897 exec '''def %s(self, *args, **kwds):
898 return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
899
900 ProxyType = type(name, (BaseProxy,), dic)
901 ProxyType._exposed_ = exposed
902 _cache[(name, exposed)] = ProxyType
903 return ProxyType
904
905
906def AutoProxy(token, serializer, manager=None, authkey=None,
907 exposed=None, incref=True):
908 '''
909 Return an auto-proxy for `token`
910 '''
911 _Client = listener_client[serializer][1]
912
913 if exposed is None:
914 conn = _Client(token.address, authkey=authkey)
915 try:
916 exposed = dispatch(conn, None, 'get_methods', (token,))
917 finally:
918 conn.close()
919
920 if authkey is None and manager is not None:
921 authkey = manager._authkey
922 if authkey is None:
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000923 authkey = current_process().authkey
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000924
925 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
926 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
927 incref=incref)
928 proxy._isauto = True
929 return proxy
930
931#
932# Types/callables which we will register with SyncManager
933#
934
935class Namespace(object):
936 def __init__(self, **kwds):
937 self.__dict__.update(kwds)
938 def __repr__(self):
939 items = self.__dict__.items()
940 temp = []
941 for name, value in items:
942 if not name.startswith('_'):
943 temp.append('%s=%r' % (name, value))
944 temp.sort()
945 return 'Namespace(%s)' % str.join(', ', temp)
946
947class Value(object):
948 def __init__(self, typecode, value, lock=True):
949 self._typecode = typecode
950 self._value = value
951 def get(self):
952 return self._value
953 def set(self, value):
954 self._value = value
955 def __repr__(self):
956 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
957 value = property(get, set)
958
959def Array(typecode, sequence, lock=True):
960 return array.array(typecode, sequence)
961
962#
963# Proxy types used by SyncManager
964#
965
966class IteratorProxy(BaseProxy):
967 # XXX remove methods for Py3.0 and Py2.6
968 _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
969 def __iter__(self):
970 return self
971 def __next__(self, *args):
972 return self._callmethod('__next__', args)
973 def next(self, *args):
974 return self._callmethod('next', args)
975 def send(self, *args):
976 return self._callmethod('send', args)
977 def throw(self, *args):
978 return self._callmethod('throw', args)
979 def close(self, *args):
980 return self._callmethod('close', args)
981
982
983class AcquirerProxy(BaseProxy):
984 _exposed_ = ('acquire', 'release')
985 def acquire(self, blocking=True):
986 return self._callmethod('acquire', (blocking,))
987 def release(self):
988 return self._callmethod('release')
989 def __enter__(self):
990 return self._callmethod('acquire')
991 def __exit__(self, exc_type, exc_val, exc_tb):
992 return self._callmethod('release')
993
994
995class ConditionProxy(AcquirerProxy):
996 # XXX will Condition.notfyAll() name be available in Py3.0?
997 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
998 def wait(self, timeout=None):
999 return self._callmethod('wait', (timeout,))
1000 def notify(self):
1001 return self._callmethod('notify')
1002 def notify_all(self):
1003 return self._callmethod('notify_all')
1004
1005class EventProxy(BaseProxy):
Benjamin Peterson80821f72008-06-26 21:29:19 +00001006 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001007 def is_set(self):
Benjamin Peterson0adfd932008-06-26 21:24:35 +00001008 return self._callmethod('is_set')
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001009 def set(self):
1010 return self._callmethod('set')
1011 def clear(self):
1012 return self._callmethod('clear')
1013 def wait(self, timeout=None):
1014 return self._callmethod('wait', (timeout,))
1015
1016class NamespaceProxy(BaseProxy):
1017 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1018 def __getattr__(self, key):
1019 if key[0] == '_':
1020 return object.__getattribute__(self, key)
1021 callmethod = object.__getattribute__(self, '_callmethod')
1022 return callmethod('__getattribute__', (key,))
1023 def __setattr__(self, key, value):
1024 if key[0] == '_':
1025 return object.__setattr__(self, key, value)
1026 callmethod = object.__getattribute__(self, '_callmethod')
1027 return callmethod('__setattr__', (key, value))
1028 def __delattr__(self, key):
1029 if key[0] == '_':
1030 return object.__delattr__(self, key)
1031 callmethod = object.__getattribute__(self, '_callmethod')
1032 return callmethod('__delattr__', (key,))
1033
1034
1035class ValueProxy(BaseProxy):
1036 _exposed_ = ('get', 'set')
1037 def get(self):
1038 return self._callmethod('get')
1039 def set(self, value):
1040 return self._callmethod('set', (value,))
1041 value = property(get, set)
1042
1043
1044BaseListProxy = MakeProxyType('BaseListProxy', (
1045 '__add__', '__contains__', '__delitem__', '__delslice__',
1046 '__getitem__', '__getslice__', '__len__', '__mul__',
1047 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1048 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1049 'reverse', 'sort', '__imul__'
1050 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1051class ListProxy(BaseListProxy):
1052 def __iadd__(self, value):
1053 self._callmethod('extend', (value,))
1054 return self
1055 def __imul__(self, value):
1056 self._callmethod('__imul__', (value,))
1057 return self
1058
1059
1060DictProxy = MakeProxyType('DictProxy', (
1061 '__contains__', '__delitem__', '__getitem__', '__len__',
1062 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1063 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1064 ))
1065
1066
1067ArrayProxy = MakeProxyType('ArrayProxy', (
1068 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1069 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1070
1071
1072PoolProxy = MakeProxyType('PoolProxy', (
1073 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1074 'map', 'map_async', 'terminate'
1075 ))
1076PoolProxy._method_to_typeid_ = {
1077 'apply_async': 'AsyncResult',
1078 'map_async': 'AsyncResult',
1079 'imap': 'Iterator',
1080 'imap_unordered': 'Iterator'
1081 }
1082
1083#
1084# Definition of SyncManager
1085#
1086
1087class SyncManager(BaseManager):
1088 '''
1089 Subclass of `BaseManager` which supports a number of shared object types.
1090
1091 The types registered are those intended for the synchronization
1092 of threads, plus `dict`, `list` and `Namespace`.
1093
1094 The `multiprocessing.Manager()` function creates started instances of
1095 this class.
1096 '''
1097
1098SyncManager.register('Queue', Queue.Queue)
1099SyncManager.register('JoinableQueue', Queue.Queue)
1100SyncManager.register('Event', threading.Event, EventProxy)
1101SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1102SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1103SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1104SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1105 AcquirerProxy)
1106SyncManager.register('Condition', threading.Condition, ConditionProxy)
1107SyncManager.register('Pool', Pool, PoolProxy)
1108SyncManager.register('list', list, ListProxy)
1109SyncManager.register('dict', dict, DictProxy)
1110SyncManager.register('Value', Value, ValueProxy)
1111SyncManager.register('Array', Array, ArrayProxy)
1112SyncManager.register('Namespace', Namespace, NamespaceProxy)
1113
1114# types returned by methods of PoolProxy
1115SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1116SyncManager.register('AsyncResult', create_method=False)