blob: 5588ead1169c392a7a759350ffb7fa5e1035875a [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15# notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17# notice, this list of conditions and the following disclaimer in the
18# documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20# used to endorse or promote products derived from this software
21# without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000034#
35
36__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
37
38#
39# Imports
40#
41
42import os
43import sys
44import weakref
45import threading
46import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000047import queue
48
49from traceback import format_exc
Jesse Nollerf70a5382009-01-18 19:44:02 +000050from pickle import PicklingError
Benjamin Petersone711caf2008-06-11 16:44:04 +000051from multiprocessing import Process, current_process, active_children, Pool, util, connection
52from multiprocessing.process import AuthenticationString
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000053from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000054from multiprocessing.util import Finalize, info
55
Benjamin Petersone711caf2008-06-11 16:44:04 +000056#
Benjamin Petersone711caf2008-06-11 16:44:04 +000057# Register some things for pickling
58#
59
60def reduce_array(a):
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +000061 return array.array, (a.typecode, a.tobytes())
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000062ForkingPickler.register(array.array, reduce_array)
Benjamin Petersone711caf2008-06-11 16:44:04 +000063
64view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
Benjamin Petersond61438a2008-06-25 13:04:48 +000065if view_types[0] is not list: # only needed in Py3.0
Benjamin Petersone711caf2008-06-11 16:44:04 +000066 def rebuild_as_list(obj):
67 return list, (list(obj),)
68 for view_type in view_types:
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000069 ForkingPickler.register(view_type, rebuild_as_list)
Amaury Forgeot d'Arcd757e732008-08-20 08:58:40 +000070 import copyreg
71 copyreg.pickle(view_type, rebuild_as_list)
Benjamin Petersone711caf2008-06-11 16:44:04 +000072
73#
74# Type for identifying shared objects
75#
76
77class Token(object):
78 '''
79 Type to uniquely indentify a shared object
80 '''
81 __slots__ = ('typeid', 'address', 'id')
82
83 def __init__(self, typeid, address, id):
84 (self.typeid, self.address, self.id) = (typeid, address, id)
85
86 def __getstate__(self):
87 return (self.typeid, self.address, self.id)
88
89 def __setstate__(self, state):
90 (self.typeid, self.address, self.id) = state
91
92 def __repr__(self):
93 return 'Token(typeid=%r, address=%r, id=%r)' % \
94 (self.typeid, self.address, self.id)
95
96#
97# Function for communication with a manager's server process
98#
99
100def dispatch(c, id, methodname, args=(), kwds={}):
101 '''
102 Send a message to manager using connection `c` and return response
103 '''
104 c.send((id, methodname, args, kwds))
105 kind, result = c.recv()
106 if kind == '#RETURN':
107 return result
108 raise convert_to_error(kind, result)
109
110def convert_to_error(kind, result):
111 if kind == '#ERROR':
112 return result
113 elif kind == '#TRACEBACK':
114 assert type(result) is str
115 return RemoteError(result)
116 elif kind == '#UNSERIALIZABLE':
117 assert type(result) is str
118 return RemoteError('Unserializable message: %s\n' % result)
119 else:
120 return ValueError('Unrecognized message type')
121
122class RemoteError(Exception):
123 def __str__(self):
124 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
125
126#
127# Functions for finding the method names of an object
128#
129
130def all_methods(obj):
131 '''
132 Return a list of names of methods of `obj`
133 '''
134 temp = []
135 for name in dir(obj):
136 func = getattr(obj, name)
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200137 if callable(func):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000138 temp.append(name)
139 return temp
140
141def public_methods(obj):
142 '''
143 Return a list of names of methods of `obj` which do not start with '_'
144 '''
145 return [name for name in all_methods(obj) if name[0] != '_']
146
147#
148# Server which is run in a process controlled by a manager
149#
150
151class Server(object):
152 '''
153 Server class which runs in a process controlled by a manager object
154 '''
155 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
156 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
157
158 def __init__(self, registry, address, authkey, serializer):
159 assert isinstance(authkey, bytes)
160 self.registry = registry
161 self.authkey = AuthenticationString(authkey)
162 Listener, Client = listener_client[serializer]
163
164 # do authentication later
Charles-François Natalife8039b2011-12-23 19:06:48 +0100165 self.listener = Listener(address=address, backlog=16)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 self.address = self.listener.address
167
Jesse Noller63b3a972009-01-21 02:15:48 +0000168 self.id_to_obj = {'0': (None, ())}
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169 self.id_to_refcount = {}
170 self.mutex = threading.RLock()
171 self.stop = 0
172
173 def serve_forever(self):
174 '''
175 Run the server forever
176 '''
177 current_process()._manager_server = self
178 try:
179 try:
180 while 1:
181 try:
182 c = self.listener.accept()
183 except (OSError, IOError):
184 continue
185 t = threading.Thread(target=self.handle_request, args=(c,))
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000186 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 t.start()
188 except (KeyboardInterrupt, SystemExit):
189 pass
190 finally:
191 self.stop = 999
192 self.listener.close()
193
194 def handle_request(self, c):
195 '''
196 Handle a new connection
197 '''
198 funcname = result = request = None
199 try:
200 connection.deliver_challenge(c, self.authkey)
201 connection.answer_challenge(c, self.authkey)
202 request = c.recv()
203 ignore, funcname, args, kwds = request
204 assert funcname in self.public, '%r unrecognized' % funcname
205 func = getattr(self, funcname)
206 except Exception:
207 msg = ('#TRACEBACK', format_exc())
208 else:
209 try:
210 result = func(c, *args, **kwds)
211 except Exception:
212 msg = ('#TRACEBACK', format_exc())
213 else:
214 msg = ('#RETURN', result)
215 try:
216 c.send(msg)
217 except Exception as e:
218 try:
219 c.send(('#TRACEBACK', format_exc()))
220 except Exception:
221 pass
222 util.info('Failure to send message: %r', msg)
223 util.info(' ... request was %r', request)
224 util.info(' ... exception was %r', e)
225
226 c.close()
227
228 def serve_client(self, conn):
229 '''
230 Handle requests from the proxies in a particular process/thread
231 '''
232 util.debug('starting server thread to service %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000233 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234
235 recv = conn.recv
236 send = conn.send
237 id_to_obj = self.id_to_obj
238
239 while not self.stop:
240
241 try:
242 methodname = obj = None
243 request = recv()
244 ident, methodname, args, kwds = request
245 obj, exposed, gettypeid = id_to_obj[ident]
246
247 if methodname not in exposed:
248 raise AttributeError(
249 'method %r of %r object is not in exposed=%r' %
250 (methodname, type(obj), exposed)
251 )
252
253 function = getattr(obj, methodname)
254
255 try:
256 res = function(*args, **kwds)
257 except Exception as e:
258 msg = ('#ERROR', e)
259 else:
260 typeid = gettypeid and gettypeid.get(methodname, None)
261 if typeid:
262 rident, rexposed = self.create(conn, typeid, res)
263 token = Token(typeid, self.address, rident)
264 msg = ('#PROXY', (rexposed, token))
265 else:
266 msg = ('#RETURN', res)
267
268 except AttributeError:
269 if methodname is None:
270 msg = ('#TRACEBACK', format_exc())
271 else:
272 try:
273 fallback_func = self.fallback_mapping[methodname]
274 result = fallback_func(
275 self, conn, ident, obj, *args, **kwds
276 )
277 msg = ('#RETURN', result)
278 except Exception:
279 msg = ('#TRACEBACK', format_exc())
280
281 except EOFError:
282 util.debug('got EOF -- exiting thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000283 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000284 sys.exit(0)
285
286 except Exception:
287 msg = ('#TRACEBACK', format_exc())
288
289 try:
290 try:
291 send(msg)
292 except Exception as e:
293 send(('#UNSERIALIZABLE', repr(msg)))
294 except Exception as e:
295 util.info('exception in thread serving %r',
Benjamin Peterson72753702008-08-18 18:09:21 +0000296 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 util.info(' ... message was %r', msg)
298 util.info(' ... exception was %r', e)
299 conn.close()
300 sys.exit(1)
301
302 def fallback_getvalue(self, conn, ident, obj):
303 return obj
304
305 def fallback_str(self, conn, ident, obj):
306 return str(obj)
307
308 def fallback_repr(self, conn, ident, obj):
309 return repr(obj)
310
311 fallback_mapping = {
312 '__str__':fallback_str,
313 '__repr__':fallback_repr,
314 '#GETVALUE':fallback_getvalue
315 }
316
317 def dummy(self, c):
318 pass
319
320 def debug_info(self, c):
321 '''
322 Return some info --- useful to spot problems with refcounting
323 '''
324 self.mutex.acquire()
325 try:
326 result = []
327 keys = list(self.id_to_obj.keys())
328 keys.sort()
329 for ident in keys:
Jesse Noller63b3a972009-01-21 02:15:48 +0000330 if ident != '0':
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331 result.append(' %s: refcount=%s\n %s' %
332 (ident, self.id_to_refcount[ident],
333 str(self.id_to_obj[ident][0])[:75]))
334 return '\n'.join(result)
335 finally:
336 self.mutex.release()
337
338 def number_of_objects(self, c):
339 '''
340 Number of shared objects
341 '''
Jesse Noller63b3a972009-01-21 02:15:48 +0000342 return len(self.id_to_obj) - 1 # don't count ident='0'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343
344 def shutdown(self, c):
345 '''
346 Shutdown this process
347 '''
348 try:
349 try:
350 util.debug('manager received shutdown message')
351 c.send(('#RETURN', None))
352
353 if sys.stdout != sys.__stdout__:
354 util.debug('resetting stdout, stderr')
355 sys.stdout = sys.__stdout__
356 sys.stderr = sys.__stderr__
357
358 util._run_finalizers(0)
359
360 for p in active_children():
361 util.debug('terminating a child process of manager')
362 p.terminate()
363
364 for p in active_children():
365 util.debug('terminating a child process of manager')
366 p.join()
367
368 util._run_finalizers()
369 util.info('manager exiting with exitcode 0')
370 except:
371 import traceback
372 traceback.print_exc()
373 finally:
374 exit(0)
375
376 def create(self, c, typeid, *args, **kwds):
377 '''
378 Create a new shared object and return its id
379 '''
380 self.mutex.acquire()
381 try:
382 callable, exposed, method_to_typeid, proxytype = \
383 self.registry[typeid]
384
385 if callable is None:
386 assert len(args) == 1 and not kwds
387 obj = args[0]
388 else:
389 obj = callable(*args, **kwds)
390
391 if exposed is None:
392 exposed = public_methods(obj)
393 if method_to_typeid is not None:
394 assert type(method_to_typeid) is dict
395 exposed = list(exposed) + list(method_to_typeid)
396
397 ident = '%x' % id(obj) # convert to string because xmlrpclib
398 # only has 32 bit signed integers
399 util.debug('%r callable returned object with id %r', typeid, ident)
400
401 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
402 if ident not in self.id_to_refcount:
Jesse Noller824f4f32008-09-02 19:12:20 +0000403 self.id_to_refcount[ident] = 0
404 # increment the reference count immediately, to avoid
405 # this object being garbage collected before a Proxy
406 # object for it can be created. The caller of create()
407 # is responsible for doing a decref once the Proxy object
408 # has been created.
409 self.incref(c, ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000410 return ident, tuple(exposed)
411 finally:
412 self.mutex.release()
413
414 def get_methods(self, c, token):
415 '''
416 Return the methods of the shared object indicated by token
417 '''
418 return tuple(self.id_to_obj[token.id][1])
419
420 def accept_connection(self, c, name):
421 '''
422 Spawn a new thread to serve this connection
423 '''
Benjamin Peterson72753702008-08-18 18:09:21 +0000424 threading.current_thread().name = name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425 c.send(('#RETURN', None))
426 self.serve_client(c)
427
428 def incref(self, c, ident):
429 self.mutex.acquire()
430 try:
Jesse Noller824f4f32008-09-02 19:12:20 +0000431 self.id_to_refcount[ident] += 1
Benjamin Petersone711caf2008-06-11 16:44:04 +0000432 finally:
433 self.mutex.release()
434
435 def decref(self, c, ident):
436 self.mutex.acquire()
437 try:
438 assert self.id_to_refcount[ident] >= 1
439 self.id_to_refcount[ident] -= 1
440 if self.id_to_refcount[ident] == 0:
441 del self.id_to_obj[ident], self.id_to_refcount[ident]
Benjamin Peterson4ac9ce42009-10-04 14:49:41 +0000442 util.debug('disposing of obj with id %r', ident)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000443 finally:
444 self.mutex.release()
445
446#
447# Class to represent state of a manager
448#
449
450class State(object):
451 __slots__ = ['value']
452 INITIAL = 0
453 STARTED = 1
454 SHUTDOWN = 2
455
456#
457# Mapping from serializer name to Listener and Client types
458#
459
460listener_client = {
461 'pickle' : (connection.Listener, connection.Client),
462 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
463 }
464
465#
466# Definition of BaseManager
467#
468
469class BaseManager(object):
470 '''
471 Base class for managers
472 '''
473 _registry = {}
474 _Server = Server
475
476 def __init__(self, address=None, authkey=None, serializer='pickle'):
477 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000478 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000479 self._address = address # XXX not final address if eg ('', 0)
480 self._authkey = AuthenticationString(authkey)
481 self._state = State()
482 self._state.value = State.INITIAL
483 self._serializer = serializer
484 self._Listener, self._Client = listener_client[serializer]
485
486 def __reduce__(self):
487 return type(self).from_address, \
488 (self._address, self._authkey, self._serializer)
489
490 def get_server(self):
491 '''
492 Return server object with serve_forever() method and address attribute
493 '''
494 assert self._state.value == State.INITIAL
495 return Server(self._registry, self._address,
496 self._authkey, self._serializer)
497
498 def connect(self):
499 '''
500 Connect manager object to the server process
501 '''
502 Listener, Client = listener_client[self._serializer]
503 conn = Client(self._address, authkey=self._authkey)
504 dispatch(conn, None, 'dummy')
505 self._state.value = State.STARTED
506
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000507 def start(self, initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000508 '''
509 Spawn a server process for this manager object
510 '''
511 assert self._state.value == State.INITIAL
512
Florent Xicluna5d1155c2011-10-28 14:45:05 +0200513 if initializer is not None and not callable(initializer):
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000514 raise TypeError('initializer must be a callable')
515
Benjamin Petersone711caf2008-06-11 16:44:04 +0000516 # pipe over which we will retrieve address of server
517 reader, writer = connection.Pipe(duplex=False)
518
519 # spawn process which runs a server
520 self._process = Process(
521 target=type(self)._run_server,
522 args=(self._registry, self._address, self._authkey,
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000523 self._serializer, writer, initializer, initargs),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000524 )
525 ident = ':'.join(str(i) for i in self._process._identity)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000526 self._process.name = type(self).__name__ + '-' + ident
Benjamin Petersone711caf2008-06-11 16:44:04 +0000527 self._process.start()
528
529 # get address of server
530 writer.close()
531 self._address = reader.recv()
532 reader.close()
533
534 # register a finalizer
535 self._state.value = State.STARTED
536 self.shutdown = util.Finalize(
537 self, type(self)._finalize_manager,
538 args=(self._process, self._address, self._authkey,
539 self._state, self._Client),
540 exitpriority=0
541 )
542
543 @classmethod
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000544 def _run_server(cls, registry, address, authkey, serializer, writer,
545 initializer=None, initargs=()):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000546 '''
547 Create a server, report its address and run it
548 '''
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +0000549 if initializer is not None:
550 initializer(*initargs)
551
Benjamin Petersone711caf2008-06-11 16:44:04 +0000552 # create server
553 server = cls._Server(registry, address, authkey, serializer)
554
555 # inform parent process of the server's address
556 writer.send(server.address)
557 writer.close()
558
559 # run the manager
560 util.info('manager serving at %r', server.address)
561 server.serve_forever()
562
563 def _create(self, typeid, *args, **kwds):
564 '''
565 Create a new shared object; return the token and exposed tuple
566 '''
567 assert self._state.value == State.STARTED, 'server not yet started'
568 conn = self._Client(self._address, authkey=self._authkey)
569 try:
570 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
571 finally:
572 conn.close()
573 return Token(typeid, self._address, id), exposed
574
575 def join(self, timeout=None):
576 '''
577 Join the manager process (if it has been spawned)
578 '''
579 self._process.join(timeout)
580
581 def _debug_info(self):
582 '''
583 Return some info about the servers shared objects and connections
584 '''
585 conn = self._Client(self._address, authkey=self._authkey)
586 try:
587 return dispatch(conn, None, 'debug_info')
588 finally:
589 conn.close()
590
591 def _number_of_objects(self):
592 '''
593 Return the number of shared objects
594 '''
595 conn = self._Client(self._address, authkey=self._authkey)
596 try:
597 return dispatch(conn, None, 'number_of_objects')
598 finally:
599 conn.close()
600
601 def __enter__(self):
602 return self
603
604 def __exit__(self, exc_type, exc_val, exc_tb):
605 self.shutdown()
606
607 @staticmethod
608 def _finalize_manager(process, address, authkey, state, _Client):
609 '''
610 Shutdown the manager process; will be registered as a finalizer
611 '''
612 if process.is_alive():
613 util.info('sending shutdown message to manager')
614 try:
615 conn = _Client(address, authkey=authkey)
616 try:
617 dispatch(conn, None, 'shutdown')
618 finally:
619 conn.close()
620 except Exception:
621 pass
622
623 process.join(timeout=0.2)
624 if process.is_alive():
625 util.info('manager still alive')
626 if hasattr(process, 'terminate'):
627 util.info('trying to `terminate()` manager process')
628 process.terminate()
629 process.join(timeout=0.1)
630 if process.is_alive():
631 util.info('manager still alive after terminate')
632
633 state.value = State.SHUTDOWN
634 try:
635 del BaseProxy._address_to_local[address]
636 except KeyError:
637 pass
638
639 address = property(lambda self: self._address)
640
641 @classmethod
642 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
643 method_to_typeid=None, create_method=True):
644 '''
645 Register a typeid with the manager type
646 '''
647 if '_registry' not in cls.__dict__:
648 cls._registry = cls._registry.copy()
649
650 if proxytype is None:
651 proxytype = AutoProxy
652
653 exposed = exposed or getattr(proxytype, '_exposed_', None)
654
655 method_to_typeid = method_to_typeid or \
656 getattr(proxytype, '_method_to_typeid_', None)
657
658 if method_to_typeid:
659 for key, value in list(method_to_typeid.items()):
660 assert type(key) is str, '%r is not a string' % key
661 assert type(value) is str, '%r is not a string' % value
662
663 cls._registry[typeid] = (
664 callable, exposed, method_to_typeid, proxytype
665 )
666
667 if create_method:
668 def temp(self, *args, **kwds):
669 util.debug('requesting creation of a shared %r object', typeid)
670 token, exp = self._create(typeid, *args, **kwds)
671 proxy = proxytype(
672 token, self._serializer, manager=self,
673 authkey=self._authkey, exposed=exp
674 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000675 conn = self._Client(token.address, authkey=self._authkey)
676 dispatch(conn, None, 'decref', (token.id,))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000677 return proxy
678 temp.__name__ = typeid
679 setattr(cls, typeid, temp)
680
681#
682# Subclass of set which get cleared after a fork
683#
684
685class ProcessLocalSet(set):
686 def __init__(self):
687 util.register_after_fork(self, lambda obj: obj.clear())
688 def __reduce__(self):
689 return type(self), ()
690
691#
692# Definition of BaseProxy
693#
694
695class BaseProxy(object):
696 '''
697 A base for proxies of shared objects
698 '''
699 _address_to_local = {}
700 _mutex = util.ForkAwareThreadLock()
701
702 def __init__(self, token, serializer, manager=None,
703 authkey=None, exposed=None, incref=True):
704 BaseProxy._mutex.acquire()
705 try:
706 tls_idset = BaseProxy._address_to_local.get(token.address, None)
707 if tls_idset is None:
708 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
709 BaseProxy._address_to_local[token.address] = tls_idset
710 finally:
711 BaseProxy._mutex.release()
712
713 # self._tls is used to record the connection used by this
714 # thread to communicate with the manager at token.address
715 self._tls = tls_idset[0]
716
717 # self._idset is used to record the identities of all shared
718 # objects for which the current process owns references and
719 # which are in the manager at token.address
720 self._idset = tls_idset[1]
721
722 self._token = token
723 self._id = self._token.id
724 self._manager = manager
725 self._serializer = serializer
726 self._Client = listener_client[serializer][1]
727
728 if authkey is not None:
729 self._authkey = AuthenticationString(authkey)
730 elif self._manager is not None:
731 self._authkey = self._manager._authkey
732 else:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000733 self._authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000734
735 if incref:
736 self._incref()
737
738 util.register_after_fork(self, BaseProxy._after_fork)
739
740 def _connect(self):
741 util.debug('making connection to manager')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000742 name = current_process().name
Benjamin Peterson72753702008-08-18 18:09:21 +0000743 if threading.current_thread().name != 'MainThread':
744 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000745 conn = self._Client(self._token.address, authkey=self._authkey)
746 dispatch(conn, None, 'accept_connection', (name,))
747 self._tls.connection = conn
748
749 def _callmethod(self, methodname, args=(), kwds={}):
750 '''
751 Try to call a method of the referrent and return a copy of the result
752 '''
753 try:
754 conn = self._tls.connection
755 except AttributeError:
756 util.debug('thread %r does not own a connection',
Benjamin Peterson72753702008-08-18 18:09:21 +0000757 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000758 self._connect()
759 conn = self._tls.connection
760
761 conn.send((self._id, methodname, args, kwds))
762 kind, result = conn.recv()
763
764 if kind == '#RETURN':
765 return result
766 elif kind == '#PROXY':
767 exposed, token = result
768 proxytype = self._manager._registry[token.typeid][-1]
Jesse Noller824f4f32008-09-02 19:12:20 +0000769 proxy = proxytype(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000770 token, self._serializer, manager=self._manager,
771 authkey=self._authkey, exposed=exposed
772 )
Jesse Noller824f4f32008-09-02 19:12:20 +0000773 conn = self._Client(token.address, authkey=self._authkey)
774 dispatch(conn, None, 'decref', (token.id,))
775 return proxy
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 raise convert_to_error(kind, result)
777
778 def _getvalue(self):
779 '''
780 Get a copy of the value of the referent
781 '''
782 return self._callmethod('#GETVALUE')
783
784 def _incref(self):
785 conn = self._Client(self._token.address, authkey=self._authkey)
786 dispatch(conn, None, 'incref', (self._id,))
787 util.debug('INCREF %r', self._token.id)
788
789 self._idset.add(self._id)
790
791 state = self._manager and self._manager._state
792
793 self._close = util.Finalize(
794 self, BaseProxy._decref,
795 args=(self._token, self._authkey, state,
796 self._tls, self._idset, self._Client),
797 exitpriority=10
798 )
799
800 @staticmethod
801 def _decref(token, authkey, state, tls, idset, _Client):
802 idset.discard(token.id)
803
804 # check whether manager is still alive
805 if state is None or state.value == State.STARTED:
806 # tell manager this process no longer cares about referent
807 try:
808 util.debug('DECREF %r', token.id)
809 conn = _Client(token.address, authkey=authkey)
810 dispatch(conn, None, 'decref', (token.id,))
811 except Exception as e:
812 util.debug('... decref failed %s', e)
813
814 else:
815 util.debug('DECREF %r -- manager already shutdown', token.id)
816
817 # check whether we can close this thread's connection because
818 # the process owns no more references to objects for this manager
819 if not idset and hasattr(tls, 'connection'):
820 util.debug('thread %r has no more proxies so closing conn',
Benjamin Peterson72753702008-08-18 18:09:21 +0000821 threading.current_thread().name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000822 tls.connection.close()
823 del tls.connection
824
825 def _after_fork(self):
826 self._manager = None
827 try:
828 self._incref()
829 except Exception as e:
830 # the proxy may just be for a manager which has shutdown
831 util.info('incref failed: %s' % e)
832
833 def __reduce__(self):
834 kwds = {}
835 if Popen.thread_is_spawning():
836 kwds['authkey'] = self._authkey
837
838 if getattr(self, '_isauto', False):
839 kwds['exposed'] = self._exposed_
840 return (RebuildProxy,
841 (AutoProxy, self._token, self._serializer, kwds))
842 else:
843 return (RebuildProxy,
844 (type(self), self._token, self._serializer, kwds))
845
846 def __deepcopy__(self, memo):
847 return self._getvalue()
848
849 def __repr__(self):
850 return '<%s object, typeid %r at %s>' % \
851 (type(self).__name__, self._token.typeid, '0x%x' % id(self))
852
853 def __str__(self):
854 '''
855 Return representation of the referent (or a fall-back if that fails)
856 '''
857 try:
858 return self._callmethod('__repr__')
859 except Exception:
860 return repr(self)[:-1] + "; '__str__()' failed>"
861
862#
863# Function used for unpickling
864#
865
866def RebuildProxy(func, token, serializer, kwds):
867 '''
868 Function used for unpickling proxy objects.
869
870 If possible the shared object is returned, or otherwise a proxy for it.
871 '''
872 server = getattr(current_process(), '_manager_server', None)
873
874 if server and server.address == token.address:
875 return server.id_to_obj[token.id][0]
876 else:
877 incref = (
878 kwds.pop('incref', True) and
879 not getattr(current_process(), '_inheriting', False)
880 )
881 return func(token, serializer, incref=incref, **kwds)
882
883#
884# Functions to create proxies and proxy types
885#
886
887def MakeProxyType(name, exposed, _cache={}):
888 '''
889 Return an proxy type whose methods are given by `exposed`
890 '''
891 exposed = tuple(exposed)
892 try:
893 return _cache[(name, exposed)]
894 except KeyError:
895 pass
896
897 dic = {}
898
899 for meth in exposed:
900 exec('''def %s(self, *args, **kwds):
901 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
902
903 ProxyType = type(name, (BaseProxy,), dic)
904 ProxyType._exposed_ = exposed
905 _cache[(name, exposed)] = ProxyType
906 return ProxyType
907
908
909def AutoProxy(token, serializer, manager=None, authkey=None,
910 exposed=None, incref=True):
911 '''
912 Return an auto-proxy for `token`
913 '''
914 _Client = listener_client[serializer][1]
915
916 if exposed is None:
917 conn = _Client(token.address, authkey=authkey)
918 try:
919 exposed = dispatch(conn, None, 'get_methods', (token,))
920 finally:
921 conn.close()
922
923 if authkey is None and manager is not None:
924 authkey = manager._authkey
925 if authkey is None:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000926 authkey = current_process().authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000927
928 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
929 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
930 incref=incref)
931 proxy._isauto = True
932 return proxy
933
934#
935# Types/callables which we will register with SyncManager
936#
937
938class Namespace(object):
939 def __init__(self, **kwds):
940 self.__dict__.update(kwds)
941 def __repr__(self):
942 items = list(self.__dict__.items())
943 temp = []
944 for name, value in items:
945 if not name.startswith('_'):
946 temp.append('%s=%r' % (name, value))
947 temp.sort()
948 return 'Namespace(%s)' % str.join(', ', temp)
949
950class Value(object):
951 def __init__(self, typecode, value, lock=True):
952 self._typecode = typecode
953 self._value = value
954 def get(self):
955 return self._value
956 def set(self, value):
957 self._value = value
958 def __repr__(self):
959 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
960 value = property(get, set)
961
962def Array(typecode, sequence, lock=True):
963 return array.array(typecode, sequence)
964
965#
966# Proxy types used by SyncManager
967#
968
969class IteratorProxy(BaseProxy):
Benjamin Petersond61438a2008-06-25 13:04:48 +0000970 _exposed_ = ('__next__', 'send', 'throw', 'close')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 def __iter__(self):
972 return self
973 def __next__(self, *args):
974 return self._callmethod('__next__', args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000975 def send(self, *args):
976 return self._callmethod('send', args)
977 def throw(self, *args):
978 return self._callmethod('throw', args)
979 def close(self, *args):
980 return self._callmethod('close', args)
981
982
983class AcquirerProxy(BaseProxy):
984 _exposed_ = ('acquire', 'release')
985 def acquire(self, blocking=True):
986 return self._callmethod('acquire', (blocking,))
987 def release(self):
988 return self._callmethod('release')
989 def __enter__(self):
990 return self._callmethod('acquire')
991 def __exit__(self, exc_type, exc_val, exc_tb):
992 return self._callmethod('release')
993
994
995class ConditionProxy(AcquirerProxy):
Benjamin Peterson672b8032008-06-11 19:14:14 +0000996 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000997 def wait(self, timeout=None):
998 return self._callmethod('wait', (timeout,))
999 def notify(self):
1000 return self._callmethod('notify')
1001 def notify_all(self):
Benjamin Peterson672b8032008-06-11 19:14:14 +00001002 return self._callmethod('notify_all')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001003
1004class EventProxy(BaseProxy):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001005 _exposed_ = ('is_set', 'set', 'clear', 'wait')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001006 def is_set(self):
Benjamin Peterson04f7d532008-06-12 17:02:47 +00001007 return self._callmethod('is_set')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001008 def set(self):
1009 return self._callmethod('set')
1010 def clear(self):
1011 return self._callmethod('clear')
1012 def wait(self, timeout=None):
1013 return self._callmethod('wait', (timeout,))
1014
1015class NamespaceProxy(BaseProxy):
1016 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1017 def __getattr__(self, key):
1018 if key[0] == '_':
1019 return object.__getattribute__(self, key)
1020 callmethod = object.__getattribute__(self, '_callmethod')
1021 return callmethod('__getattribute__', (key,))
1022 def __setattr__(self, key, value):
1023 if key[0] == '_':
1024 return object.__setattr__(self, key, value)
1025 callmethod = object.__getattribute__(self, '_callmethod')
1026 return callmethod('__setattr__', (key, value))
1027 def __delattr__(self, key):
1028 if key[0] == '_':
1029 return object.__delattr__(self, key)
1030 callmethod = object.__getattribute__(self, '_callmethod')
1031 return callmethod('__delattr__', (key,))
1032
1033
1034class ValueProxy(BaseProxy):
1035 _exposed_ = ('get', 'set')
1036 def get(self):
1037 return self._callmethod('get')
1038 def set(self, value):
1039 return self._callmethod('set', (value,))
1040 value = property(get, set)
1041
1042
1043BaseListProxy = MakeProxyType('BaseListProxy', (
1044 '__add__', '__contains__', '__delitem__', '__delslice__',
1045 '__getitem__', '__getslice__', '__len__', '__mul__',
1046 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1047 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1048 'reverse', 'sort', '__imul__'
1049 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1050class ListProxy(BaseListProxy):
1051 def __iadd__(self, value):
1052 self._callmethod('extend', (value,))
1053 return self
1054 def __imul__(self, value):
1055 self._callmethod('__imul__', (value,))
1056 return self
1057
1058
1059DictProxy = MakeProxyType('DictProxy', (
1060 '__contains__', '__delitem__', '__getitem__', '__len__',
1061 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1062 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1063 ))
1064
1065
1066ArrayProxy = MakeProxyType('ArrayProxy', (
1067 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1068 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1069
1070
1071PoolProxy = MakeProxyType('PoolProxy', (
1072 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1073 'map', 'map_async', 'terminate'
1074 ))
1075PoolProxy._method_to_typeid_ = {
1076 'apply_async': 'AsyncResult',
1077 'map_async': 'AsyncResult',
1078 'imap': 'Iterator',
1079 'imap_unordered': 'Iterator'
1080 }
1081
1082#
1083# Definition of SyncManager
1084#
1085
1086class SyncManager(BaseManager):
1087 '''
1088 Subclass of `BaseManager` which supports a number of shared object types.
1089
1090 The types registered are those intended for the synchronization
1091 of threads, plus `dict`, `list` and `Namespace`.
1092
1093 The `multiprocessing.Manager()` function creates started instances of
1094 this class.
1095 '''
1096
1097SyncManager.register('Queue', queue.Queue)
1098SyncManager.register('JoinableQueue', queue.Queue)
1099SyncManager.register('Event', threading.Event, EventProxy)
1100SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1101SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1102SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1103SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1104 AcquirerProxy)
1105SyncManager.register('Condition', threading.Condition, ConditionProxy)
1106SyncManager.register('Pool', Pool, PoolProxy)
1107SyncManager.register('list', list, ListProxy)
1108SyncManager.register('dict', dict, DictProxy)
1109SyncManager.register('Value', Value, ValueProxy)
1110SyncManager.register('Array', Array, ArrayProxy)
1111SyncManager.register('Namespace', Namespace, NamespaceProxy)
1112
1113# types returned by methods of PoolProxy
1114SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1115SyncManager.register('AsyncResult', create_method=False)