Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 1 | # |
| 2 | # Module to allow connection and socket objects to be transferred |
| 3 | # between processes |
| 4 | # |
| 5 | # multiprocessing/reduction.py |
| 6 | # |
R. David Murray | 3fc969a | 2010-12-14 01:38:16 +0000 | [diff] [blame] | 7 | # Copyright (c) 2006-2008, R Oudkerk |
Richard Oudkerk | 3e268aa | 2012-04-30 12:13:55 +0100 | [diff] [blame] | 8 | # Licensed to PSF under a Contributor Agreement. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 9 | # |
| 10 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 11 | __all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 12 | |
| 13 | import os |
| 14 | import sys |
| 15 | import socket |
| 16 | import threading |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 17 | import struct |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 18 | import signal |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 19 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 20 | from multiprocessing import current_process |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 21 | from multiprocessing.util import register_after_fork, debug, sub_debug |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 22 | from multiprocessing.util import is_exiting, sub_warning |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 23 | |
| 24 | |
| 25 | # |
| 26 | # |
| 27 | # |
| 28 | |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 29 | if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and |
| 30 | hasattr(socket, 'SCM_RIGHTS'))): |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 31 | raise ImportError('pickling of connections not supported') |
| 32 | |
| 33 | # |
| 34 | # Platform specific definitions |
| 35 | # |
| 36 | |
| 37 | if sys.platform == 'win32': |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 38 | # Windows |
| 39 | __all__ += ['reduce_pipe_connection'] |
Antoine Pitrou | 23bba4c | 2012-04-18 20:51:15 +0200 | [diff] [blame] | 40 | import _winapi |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 41 | |
| 42 | def send_handle(conn, handle, destination_pid): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 43 | dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) |
| 44 | conn.send(dh) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 45 | |
| 46 | def recv_handle(conn): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 47 | return conn.recv().detach() |
| 48 | |
| 49 | class DupHandle(object): |
| 50 | def __init__(self, handle, access, pid=None): |
| 51 | # duplicate handle for process with given pid |
| 52 | if pid is None: |
| 53 | pid = os.getpid() |
| 54 | proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) |
| 55 | try: |
| 56 | self._handle = _winapi.DuplicateHandle( |
| 57 | _winapi.GetCurrentProcess(), |
| 58 | handle, proc, access, False, 0) |
| 59 | finally: |
| 60 | _winapi.CloseHandle(proc) |
| 61 | self._access = access |
| 62 | self._pid = pid |
| 63 | |
| 64 | def detach(self): |
| 65 | # retrieve handle from process which currently owns it |
| 66 | if self._pid == os.getpid(): |
| 67 | return self._handle |
| 68 | proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, |
| 69 | self._pid) |
| 70 | try: |
| 71 | return _winapi.DuplicateHandle( |
| 72 | proc, self._handle, _winapi.GetCurrentProcess(), |
| 73 | self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) |
| 74 | finally: |
| 75 | _winapi.CloseHandle(proc) |
| 76 | |
| 77 | class DupSocket(object): |
| 78 | def __init__(self, sock): |
| 79 | new_sock = sock.dup() |
| 80 | def send(conn, pid): |
| 81 | share = new_sock.share(pid) |
| 82 | conn.send_bytes(share) |
| 83 | self._id = resource_sharer.register(send, new_sock.close) |
| 84 | |
| 85 | def detach(self): |
| 86 | conn = resource_sharer.get_connection(self._id) |
| 87 | try: |
| 88 | share = conn.recv_bytes() |
| 89 | return socket.fromshare(share) |
| 90 | finally: |
| 91 | conn.close() |
| 92 | |
| 93 | def reduce_socket(s): |
| 94 | return rebuild_socket, (DupSocket(s),) |
| 95 | |
| 96 | def rebuild_socket(ds): |
| 97 | return ds.detach() |
| 98 | |
| 99 | def reduce_connection(conn): |
| 100 | handle = conn.fileno() |
| 101 | with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: |
| 102 | ds = DupSocket(s) |
| 103 | return rebuild_connection, (ds, conn.readable, conn.writable) |
| 104 | |
| 105 | def rebuild_connection(ds, readable, writable): |
| 106 | from .connection import Connection |
| 107 | sock = ds.detach() |
| 108 | return Connection(sock.detach(), readable, writable) |
| 109 | |
| 110 | def reduce_pipe_connection(conn): |
| 111 | access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | |
| 112 | (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) |
| 113 | dh = DupHandle(conn.fileno(), access) |
| 114 | return rebuild_pipe_connection, (dh, conn.readable, conn.writable) |
| 115 | |
| 116 | def rebuild_pipe_connection(dh, readable, writable): |
| 117 | from .connection import PipeConnection |
| 118 | handle = dh.detach() |
| 119 | return PipeConnection(handle, readable, writable) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 120 | |
| 121 | else: |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 122 | # Unix |
Richard Oudkerk | 04ec8ce | 2012-08-16 16:48:55 +0100 | [diff] [blame^] | 123 | |
| 124 | # On MacOSX we should acknowledge receipt of fds -- see Issue14669 |
| 125 | ACKNOWLEDGE = sys.platform == 'darwin' |
| 126 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 127 | def send_handle(conn, handle, destination_pid): |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 128 | with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
| 129 | s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, |
| 130 | struct.pack("@i", handle))]) |
Richard Oudkerk | 04ec8ce | 2012-08-16 16:48:55 +0100 | [diff] [blame^] | 131 | if ACKNOWLEDGE and conn.recv_bytes() != b'ACK': |
| 132 | raise RuntimeError('did not receive acknowledgement of fd') |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 133 | |
| 134 | def recv_handle(conn): |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 135 | size = struct.calcsize("@i") |
| 136 | with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
| 137 | msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) |
| 138 | try: |
Richard Oudkerk | 04ec8ce | 2012-08-16 16:48:55 +0100 | [diff] [blame^] | 139 | if ACKNOWLEDGE: |
| 140 | conn.send_bytes(b'ACK') |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 141 | cmsg_level, cmsg_type, cmsg_data = ancdata[0] |
| 142 | if (cmsg_level == socket.SOL_SOCKET and |
| 143 | cmsg_type == socket.SCM_RIGHTS): |
| 144 | return struct.unpack("@i", cmsg_data[:size])[0] |
| 145 | except (ValueError, IndexError, struct.error): |
| 146 | pass |
| 147 | raise RuntimeError('Invalid data received') |
| 148 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 149 | class DupFd(object): |
| 150 | def __init__(self, fd): |
| 151 | new_fd = os.dup(fd) |
| 152 | def send(conn, pid): |
| 153 | send_handle(conn, new_fd, pid) |
| 154 | def close(): |
| 155 | os.close(new_fd) |
| 156 | self._id = resource_sharer.register(send, close) |
| 157 | |
| 158 | def detach(self): |
| 159 | conn = resource_sharer.get_connection(self._id) |
| 160 | try: |
| 161 | return recv_handle(conn) |
| 162 | finally: |
| 163 | conn.close() |
| 164 | |
| 165 | def reduce_socket(s): |
| 166 | df = DupFd(s.fileno()) |
| 167 | return rebuild_socket, (df, s.family, s.type, s.proto) |
| 168 | |
| 169 | def rebuild_socket(df, family, type, proto): |
| 170 | fd = df.detach() |
| 171 | s = socket.fromfd(fd, family, type, proto) |
| 172 | os.close(fd) |
| 173 | return s |
| 174 | |
| 175 | def reduce_connection(conn): |
| 176 | df = DupFd(conn.fileno()) |
| 177 | return rebuild_connection, (df, conn.readable, conn.writable) |
| 178 | |
| 179 | def rebuild_connection(df, readable, writable): |
| 180 | from .connection import Connection |
| 181 | fd = df.detach() |
| 182 | return Connection(fd, readable, writable) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 183 | |
| 184 | # |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 185 | # Server which shares registered resources with clients |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 186 | # |
| 187 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 188 | class ResourceSharer(object): |
| 189 | def __init__(self): |
| 190 | self._key = 0 |
| 191 | self._cache = {} |
| 192 | self._old_locks = [] |
| 193 | self._lock = threading.Lock() |
| 194 | self._listener = None |
| 195 | self._address = None |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 196 | self._thread = None |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 197 | register_after_fork(self, ResourceSharer._afterfork) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 198 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 199 | def register(self, send, close): |
| 200 | with self._lock: |
| 201 | if self._address is None: |
| 202 | self._start() |
| 203 | self._key += 1 |
| 204 | self._cache[self._key] = (send, close) |
| 205 | return (self._address, self._key) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 206 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 207 | @staticmethod |
| 208 | def get_connection(ident): |
| 209 | from .connection import Client |
| 210 | address, key = ident |
| 211 | c = Client(address, authkey=current_process().authkey) |
| 212 | c.send((key, os.getpid())) |
| 213 | return c |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 214 | |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 215 | def stop(self, timeout=None): |
| 216 | from .connection import Client |
| 217 | with self._lock: |
| 218 | if self._address is not None: |
| 219 | c = Client(self._address, authkey=current_process().authkey) |
| 220 | c.send(None) |
| 221 | c.close() |
| 222 | self._thread.join(timeout) |
| 223 | if self._thread.is_alive(): |
| 224 | sub_warn('ResourceSharer thread did not stop when asked') |
| 225 | self._listener.close() |
| 226 | self._thread = None |
| 227 | self._address = None |
| 228 | self._listener = None |
| 229 | for key, (send, close) in self._cache.items(): |
| 230 | close() |
| 231 | self._cache.clear() |
| 232 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 233 | def _afterfork(self): |
| 234 | for key, (send, close) in self._cache.items(): |
| 235 | close() |
| 236 | self._cache.clear() |
| 237 | # If self._lock was locked at the time of the fork, it may be broken |
| 238 | # -- see issue 6721. Replace it without letting it be gc'ed. |
| 239 | self._old_locks.append(self._lock) |
| 240 | self._lock = threading.Lock() |
| 241 | if self._listener is not None: |
| 242 | self._listener.close() |
| 243 | self._listener = None |
| 244 | self._address = None |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 245 | self._thread = None |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 246 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 247 | def _start(self): |
| 248 | from .connection import Listener |
| 249 | assert self._listener is None |
| 250 | debug('starting listener and thread for sending handles') |
| 251 | self._listener = Listener(authkey=current_process().authkey) |
| 252 | self._address = self._listener.address |
| 253 | t = threading.Thread(target=self._serve) |
| 254 | t.daemon = True |
| 255 | t.start() |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 256 | self._thread = t |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 257 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 258 | def _serve(self): |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 259 | if hasattr(signal, 'pthread_sigmask'): |
| 260 | signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 261 | while 1: |
| 262 | try: |
| 263 | conn = self._listener.accept() |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 264 | msg = conn.recv() |
| 265 | if msg is None: |
| 266 | break |
| 267 | key, destination_pid = msg |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 268 | send, close = self._cache.pop(key) |
| 269 | send(conn, destination_pid) |
| 270 | close() |
| 271 | conn.close() |
| 272 | except: |
| 273 | if not is_exiting(): |
| 274 | import traceback |
| 275 | sub_warning( |
| 276 | 'thread for sharing handles raised exception :\n' + |
| 277 | '-'*79 + '\n' + traceback.format_exc() + '-'*79 |
| 278 | ) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 279 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 280 | resource_sharer = ResourceSharer() |