| # |
| # We use a background thread for sharing fds on Unix, and for sharing sockets on |
| # Windows. |
| # |
| # A client which wants to pickle a resource registers it with the resource |
| # sharer and gets an identifier in return. The unpickling process will connect |
| # to the resource sharer, sends the identifier and its pid, and then receives |
| # the resource. |
| # |
| |
| import os |
| import signal |
| import socket |
| import sys |
| import threading |
| |
| from . import process |
| from . import reduction |
| from . import util |
| |
| __all__ = ['stop'] |
| |
| |
| if sys.platform == 'win32': |
| __all__ += ['DupSocket'] |
| |
| class DupSocket(object): |
| '''Picklable wrapper for a socket.''' |
| def __init__(self, sock): |
| new_sock = sock.dup() |
| def send(conn, pid): |
| share = new_sock.share(pid) |
| conn.send_bytes(share) |
| self._id = _resource_sharer.register(send, new_sock.close) |
| |
| def detach(self): |
| '''Get the socket. This should only be called once.''' |
| with _resource_sharer.get_connection(self._id) as conn: |
| share = conn.recv_bytes() |
| return socket.fromshare(share) |
| |
| else: |
| __all__ += ['DupFd'] |
| |
| class DupFd(object): |
| '''Wrapper for fd which can be used at any time.''' |
| def __init__(self, fd): |
| new_fd = os.dup(fd) |
| def send(conn, pid): |
| reduction.send_handle(conn, new_fd, pid) |
| def close(): |
| os.close(new_fd) |
| self._id = _resource_sharer.register(send, close) |
| |
| def detach(self): |
| '''Get the fd. This should only be called once.''' |
| with _resource_sharer.get_connection(self._id) as conn: |
| return reduction.recv_handle(conn) |
| |
| |
| class _ResourceSharer(object): |
| '''Manager for resouces using background thread.''' |
| def __init__(self): |
| self._key = 0 |
| self._cache = {} |
| self._old_locks = [] |
| self._lock = threading.Lock() |
| self._listener = None |
| self._address = None |
| self._thread = None |
| util.register_after_fork(self, _ResourceSharer._afterfork) |
| |
| def register(self, send, close): |
| '''Register resource, returning an identifier.''' |
| with self._lock: |
| if self._address is None: |
| self._start() |
| self._key += 1 |
| self._cache[self._key] = (send, close) |
| return (self._address, self._key) |
| |
| @staticmethod |
| def get_connection(ident): |
| '''Return connection from which to receive identified resource.''' |
| from .connection import Client |
| address, key = ident |
| c = Client(address, authkey=process.current_process().authkey) |
| c.send((key, os.getpid())) |
| return c |
| |
| def stop(self, timeout=None): |
| '''Stop the background thread and clear registered resources.''' |
| from .connection import Client |
| with self._lock: |
| if self._address is not None: |
| c = Client(self._address, |
| authkey=process.current_process().authkey) |
| c.send(None) |
| c.close() |
| self._thread.join(timeout) |
| if self._thread.is_alive(): |
| util.sub_warning('_ResourceSharer thread did ' |
| 'not stop when asked') |
| self._listener.close() |
| self._thread = None |
| self._address = None |
| self._listener = None |
| for key, (send, close) in self._cache.items(): |
| close() |
| self._cache.clear() |
| |
| def _afterfork(self): |
| for key, (send, close) in self._cache.items(): |
| close() |
| self._cache.clear() |
| # If self._lock was locked at the time of the fork, it may be broken |
| # -- see issue 6721. Replace it without letting it be gc'ed. |
| self._old_locks.append(self._lock) |
| self._lock = threading.Lock() |
| if self._listener is not None: |
| self._listener.close() |
| self._listener = None |
| self._address = None |
| self._thread = None |
| |
| def _start(self): |
| from .connection import Listener |
| assert self._listener is None |
| util.debug('starting listener and thread for sending handles') |
| self._listener = Listener(authkey=process.current_process().authkey) |
| self._address = self._listener.address |
| t = threading.Thread(target=self._serve) |
| t.daemon = True |
| t.start() |
| self._thread = t |
| |
| def _serve(self): |
| if hasattr(signal, 'pthread_sigmask'): |
| signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) |
| while 1: |
| try: |
| with self._listener.accept() as conn: |
| msg = conn.recv() |
| if msg is None: |
| break |
| key, destination_pid = msg |
| send, close = self._cache.pop(key) |
| try: |
| send(conn, destination_pid) |
| finally: |
| close() |
| except: |
| if not util.is_exiting(): |
| sys.excepthook(*sys.exc_info()) |
| |
| |
| _resource_sharer = _ResourceSharer() |
| stop = _resource_sharer.stop |