Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 1 | # |
| 2 | # We use a background thread for sharing fds on Unix, and for sharing sockets on |
| 3 | # Windows. |
| 4 | # |
| 5 | # A client which wants to pickle a resource registers it with the resource |
| 6 | # sharer and gets an identifier in return. The unpickling process will connect |
| 7 | # to the resource sharer, sends the identifier and its pid, and then receives |
| 8 | # the resource. |
| 9 | # |
| 10 | |
| 11 | import os |
| 12 | import signal |
| 13 | import socket |
| 14 | import sys |
| 15 | import threading |
| 16 | |
| 17 | from . import process |
| 18 | from . import reduction |
| 19 | from . import util |
| 20 | |
| 21 | __all__ = ['stop'] |
| 22 | |
| 23 | |
| 24 | if sys.platform == 'win32': |
| 25 | __all__ += ['DupSocket'] |
| 26 | |
| 27 | class DupSocket(object): |
| 28 | '''Picklable wrapper for a socket.''' |
| 29 | def __init__(self, sock): |
| 30 | new_sock = sock.dup() |
| 31 | def send(conn, pid): |
| 32 | share = new_sock.share(pid) |
| 33 | conn.send_bytes(share) |
| 34 | self._id = _resource_sharer.register(send, new_sock.close) |
| 35 | |
| 36 | def detach(self): |
| 37 | '''Get the socket. This should only be called once.''' |
| 38 | with _resource_sharer.get_connection(self._id) as conn: |
| 39 | share = conn.recv_bytes() |
| 40 | return socket.fromshare(share) |
| 41 | |
| 42 | else: |
| 43 | __all__ += ['DupFd'] |
| 44 | |
| 45 | class DupFd(object): |
| 46 | '''Wrapper for fd which can be used at any time.''' |
| 47 | def __init__(self, fd): |
| 48 | new_fd = os.dup(fd) |
| 49 | def send(conn, pid): |
| 50 | reduction.send_handle(conn, new_fd, pid) |
| 51 | def close(): |
| 52 | os.close(new_fd) |
| 53 | self._id = _resource_sharer.register(send, close) |
| 54 | |
| 55 | def detach(self): |
| 56 | '''Get the fd. This should only be called once.''' |
| 57 | with _resource_sharer.get_connection(self._id) as conn: |
| 58 | return reduction.recv_handle(conn) |
| 59 | |
| 60 | |
| 61 | class _ResourceSharer(object): |
| 62 | '''Manager for resouces using background thread.''' |
| 63 | def __init__(self): |
| 64 | self._key = 0 |
| 65 | self._cache = {} |
| 66 | self._old_locks = [] |
| 67 | self._lock = threading.Lock() |
| 68 | self._listener = None |
| 69 | self._address = None |
| 70 | self._thread = None |
| 71 | util.register_after_fork(self, _ResourceSharer._afterfork) |
| 72 | |
| 73 | def register(self, send, close): |
| 74 | '''Register resource, returning an identifier.''' |
| 75 | with self._lock: |
| 76 | if self._address is None: |
| 77 | self._start() |
| 78 | self._key += 1 |
| 79 | self._cache[self._key] = (send, close) |
| 80 | return (self._address, self._key) |
| 81 | |
| 82 | @staticmethod |
| 83 | def get_connection(ident): |
| 84 | '''Return connection from which to receive identified resource.''' |
| 85 | from .connection import Client |
| 86 | address, key = ident |
| 87 | c = Client(address, authkey=process.current_process().authkey) |
| 88 | c.send((key, os.getpid())) |
| 89 | return c |
| 90 | |
| 91 | def stop(self, timeout=None): |
| 92 | '''Stop the background thread and clear registered resources.''' |
| 93 | from .connection import Client |
| 94 | with self._lock: |
| 95 | if self._address is not None: |
| 96 | c = Client(self._address, |
| 97 | authkey=process.current_process().authkey) |
| 98 | c.send(None) |
| 99 | c.close() |
| 100 | self._thread.join(timeout) |
| 101 | if self._thread.is_alive(): |
| 102 | util.sub_warning('_ResourceSharer thread did ' |
| 103 | 'not stop when asked') |
| 104 | self._listener.close() |
| 105 | self._thread = None |
| 106 | self._address = None |
| 107 | self._listener = None |
| 108 | for key, (send, close) in self._cache.items(): |
| 109 | close() |
| 110 | self._cache.clear() |
| 111 | |
| 112 | def _afterfork(self): |
| 113 | for key, (send, close) in self._cache.items(): |
| 114 | close() |
| 115 | self._cache.clear() |
| 116 | # If self._lock was locked at the time of the fork, it may be broken |
| 117 | # -- see issue 6721. Replace it without letting it be gc'ed. |
| 118 | self._old_locks.append(self._lock) |
| 119 | self._lock = threading.Lock() |
| 120 | if self._listener is not None: |
| 121 | self._listener.close() |
| 122 | self._listener = None |
| 123 | self._address = None |
| 124 | self._thread = None |
| 125 | |
| 126 | def _start(self): |
| 127 | from .connection import Listener |
| 128 | assert self._listener is None |
| 129 | util.debug('starting listener and thread for sending handles') |
| 130 | self._listener = Listener(authkey=process.current_process().authkey) |
| 131 | self._address = self._listener.address |
| 132 | t = threading.Thread(target=self._serve) |
| 133 | t.daemon = True |
| 134 | t.start() |
| 135 | self._thread = t |
| 136 | |
| 137 | def _serve(self): |
| 138 | if hasattr(signal, 'pthread_sigmask'): |
| 139 | signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) |
| 140 | while 1: |
| 141 | try: |
| 142 | with self._listener.accept() as conn: |
| 143 | msg = conn.recv() |
| 144 | if msg is None: |
| 145 | break |
| 146 | key, destination_pid = msg |
| 147 | send, close = self._cache.pop(key) |
| 148 | try: |
| 149 | send(conn, destination_pid) |
| 150 | finally: |
| 151 | close() |
| 152 | except: |
| 153 | if not util.is_exiting(): |
| 154 | sys.excepthook(*sys.exc_info()) |
| 155 | |
| 156 | |
| 157 | _resource_sharer = _ResourceSharer() |
| 158 | stop = _resource_sharer.stop |