Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 1 | # |
| 2 | # Module to allow connection and socket objects to be transferred |
| 3 | # between processes |
| 4 | # |
| 5 | # multiprocessing/reduction.py |
| 6 | # |
R. David Murray | 3fc969a | 2010-12-14 01:38:16 +0000 | [diff] [blame] | 7 | # Copyright (c) 2006-2008, R Oudkerk |
Richard Oudkerk | 3e268aa | 2012-04-30 12:13:55 +0100 | [diff] [blame^] | 8 | # Licensed to PSF under a Contributor Agreement. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 9 | # |
| 10 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 11 | __all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 12 | |
| 13 | import os |
| 14 | import sys |
| 15 | import socket |
| 16 | import threading |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 17 | import struct |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 18 | import signal |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 19 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 20 | from multiprocessing import current_process |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 21 | from multiprocessing.util import register_after_fork, debug, sub_debug |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 22 | from multiprocessing.util import is_exiting, sub_warning |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 23 | |
| 24 | |
| 25 | # |
| 26 | # |
| 27 | # |
| 28 | |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 29 | if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and |
| 30 | hasattr(socket, 'SCM_RIGHTS'))): |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 31 | raise ImportError('pickling of connections not supported') |
| 32 | |
| 33 | # |
| 34 | # Platform specific definitions |
| 35 | # |
| 36 | |
| 37 | if sys.platform == 'win32': |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 38 | # Windows |
| 39 | __all__ += ['reduce_pipe_connection'] |
Antoine Pitrou | 23bba4c | 2012-04-18 20:51:15 +0200 | [diff] [blame] | 40 | import _winapi |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 41 | |
| 42 | def send_handle(conn, handle, destination_pid): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 43 | dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) |
| 44 | conn.send(dh) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 45 | |
| 46 | def recv_handle(conn): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 47 | return conn.recv().detach() |
| 48 | |
| 49 | class DupHandle(object): |
| 50 | def __init__(self, handle, access, pid=None): |
| 51 | # duplicate handle for process with given pid |
| 52 | if pid is None: |
| 53 | pid = os.getpid() |
| 54 | proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) |
| 55 | try: |
| 56 | self._handle = _winapi.DuplicateHandle( |
| 57 | _winapi.GetCurrentProcess(), |
| 58 | handle, proc, access, False, 0) |
| 59 | finally: |
| 60 | _winapi.CloseHandle(proc) |
| 61 | self._access = access |
| 62 | self._pid = pid |
| 63 | |
| 64 | def detach(self): |
| 65 | # retrieve handle from process which currently owns it |
| 66 | if self._pid == os.getpid(): |
| 67 | return self._handle |
| 68 | proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, |
| 69 | self._pid) |
| 70 | try: |
| 71 | return _winapi.DuplicateHandle( |
| 72 | proc, self._handle, _winapi.GetCurrentProcess(), |
| 73 | self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) |
| 74 | finally: |
| 75 | _winapi.CloseHandle(proc) |
| 76 | |
| 77 | class DupSocket(object): |
| 78 | def __init__(self, sock): |
| 79 | new_sock = sock.dup() |
| 80 | def send(conn, pid): |
| 81 | share = new_sock.share(pid) |
| 82 | conn.send_bytes(share) |
| 83 | self._id = resource_sharer.register(send, new_sock.close) |
| 84 | |
| 85 | def detach(self): |
| 86 | conn = resource_sharer.get_connection(self._id) |
| 87 | try: |
| 88 | share = conn.recv_bytes() |
| 89 | return socket.fromshare(share) |
| 90 | finally: |
| 91 | conn.close() |
| 92 | |
| 93 | def reduce_socket(s): |
| 94 | return rebuild_socket, (DupSocket(s),) |
| 95 | |
| 96 | def rebuild_socket(ds): |
| 97 | return ds.detach() |
| 98 | |
| 99 | def reduce_connection(conn): |
| 100 | handle = conn.fileno() |
| 101 | with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: |
| 102 | ds = DupSocket(s) |
| 103 | return rebuild_connection, (ds, conn.readable, conn.writable) |
| 104 | |
| 105 | def rebuild_connection(ds, readable, writable): |
| 106 | from .connection import Connection |
| 107 | sock = ds.detach() |
| 108 | return Connection(sock.detach(), readable, writable) |
| 109 | |
| 110 | def reduce_pipe_connection(conn): |
| 111 | access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | |
| 112 | (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) |
| 113 | dh = DupHandle(conn.fileno(), access) |
| 114 | return rebuild_pipe_connection, (dh, conn.readable, conn.writable) |
| 115 | |
| 116 | def rebuild_pipe_connection(dh, readable, writable): |
| 117 | from .connection import PipeConnection |
| 118 | handle = dh.detach() |
| 119 | return PipeConnection(handle, readable, writable) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 120 | |
| 121 | else: |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 122 | # Unix |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 123 | def send_handle(conn, handle, destination_pid): |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 124 | with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
| 125 | s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, |
| 126 | struct.pack("@i", handle))]) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 127 | |
| 128 | def recv_handle(conn): |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 129 | size = struct.calcsize("@i") |
| 130 | with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
| 131 | msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) |
| 132 | try: |
| 133 | cmsg_level, cmsg_type, cmsg_data = ancdata[0] |
| 134 | if (cmsg_level == socket.SOL_SOCKET and |
| 135 | cmsg_type == socket.SCM_RIGHTS): |
| 136 | return struct.unpack("@i", cmsg_data[:size])[0] |
| 137 | except (ValueError, IndexError, struct.error): |
| 138 | pass |
| 139 | raise RuntimeError('Invalid data received') |
| 140 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 141 | class DupFd(object): |
| 142 | def __init__(self, fd): |
| 143 | new_fd = os.dup(fd) |
| 144 | def send(conn, pid): |
| 145 | send_handle(conn, new_fd, pid) |
| 146 | def close(): |
| 147 | os.close(new_fd) |
| 148 | self._id = resource_sharer.register(send, close) |
| 149 | |
| 150 | def detach(self): |
| 151 | conn = resource_sharer.get_connection(self._id) |
| 152 | try: |
| 153 | return recv_handle(conn) |
| 154 | finally: |
| 155 | conn.close() |
| 156 | |
| 157 | def reduce_socket(s): |
| 158 | df = DupFd(s.fileno()) |
| 159 | return rebuild_socket, (df, s.family, s.type, s.proto) |
| 160 | |
| 161 | def rebuild_socket(df, family, type, proto): |
| 162 | fd = df.detach() |
| 163 | s = socket.fromfd(fd, family, type, proto) |
| 164 | os.close(fd) |
| 165 | return s |
| 166 | |
| 167 | def reduce_connection(conn): |
| 168 | df = DupFd(conn.fileno()) |
| 169 | return rebuild_connection, (df, conn.readable, conn.writable) |
| 170 | |
| 171 | def rebuild_connection(df, readable, writable): |
| 172 | from .connection import Connection |
| 173 | fd = df.detach() |
| 174 | return Connection(fd, readable, writable) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 175 | |
| 176 | # |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 177 | # Server which shares registered resources with clients |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 178 | # |
| 179 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 180 | class ResourceSharer(object): |
| 181 | def __init__(self): |
| 182 | self._key = 0 |
| 183 | self._cache = {} |
| 184 | self._old_locks = [] |
| 185 | self._lock = threading.Lock() |
| 186 | self._listener = None |
| 187 | self._address = None |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 188 | self._thread = None |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 189 | register_after_fork(self, ResourceSharer._afterfork) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 190 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 191 | def register(self, send, close): |
| 192 | with self._lock: |
| 193 | if self._address is None: |
| 194 | self._start() |
| 195 | self._key += 1 |
| 196 | self._cache[self._key] = (send, close) |
| 197 | return (self._address, self._key) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 198 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 199 | @staticmethod |
| 200 | def get_connection(ident): |
| 201 | from .connection import Client |
| 202 | address, key = ident |
| 203 | c = Client(address, authkey=current_process().authkey) |
| 204 | c.send((key, os.getpid())) |
| 205 | return c |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 206 | |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 207 | def stop(self, timeout=None): |
| 208 | from .connection import Client |
| 209 | with self._lock: |
| 210 | if self._address is not None: |
| 211 | c = Client(self._address, authkey=current_process().authkey) |
| 212 | c.send(None) |
| 213 | c.close() |
| 214 | self._thread.join(timeout) |
| 215 | if self._thread.is_alive(): |
| 216 | sub_warn('ResourceSharer thread did not stop when asked') |
| 217 | self._listener.close() |
| 218 | self._thread = None |
| 219 | self._address = None |
| 220 | self._listener = None |
| 221 | for key, (send, close) in self._cache.items(): |
| 222 | close() |
| 223 | self._cache.clear() |
| 224 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 225 | def _afterfork(self): |
| 226 | for key, (send, close) in self._cache.items(): |
| 227 | close() |
| 228 | self._cache.clear() |
| 229 | # If self._lock was locked at the time of the fork, it may be broken |
| 230 | # -- see issue 6721. Replace it without letting it be gc'ed. |
| 231 | self._old_locks.append(self._lock) |
| 232 | self._lock = threading.Lock() |
| 233 | if self._listener is not None: |
| 234 | self._listener.close() |
| 235 | self._listener = None |
| 236 | self._address = None |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 237 | self._thread = None |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 238 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 239 | def _start(self): |
| 240 | from .connection import Listener |
| 241 | assert self._listener is None |
| 242 | debug('starting listener and thread for sending handles') |
| 243 | self._listener = Listener(authkey=current_process().authkey) |
| 244 | self._address = self._listener.address |
| 245 | t = threading.Thread(target=self._serve) |
| 246 | t.daemon = True |
| 247 | t.start() |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 248 | self._thread = t |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 249 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 250 | def _serve(self): |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 251 | if hasattr(signal, 'pthread_sigmask'): |
| 252 | signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 253 | while 1: |
| 254 | try: |
| 255 | conn = self._listener.accept() |
Antoine Pitrou | 92ff4e1 | 2012-04-27 23:51:03 +0200 | [diff] [blame] | 256 | msg = conn.recv() |
| 257 | if msg is None: |
| 258 | break |
| 259 | key, destination_pid = msg |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 260 | send, close = self._cache.pop(key) |
| 261 | send(conn, destination_pid) |
| 262 | close() |
| 263 | conn.close() |
| 264 | except: |
| 265 | if not is_exiting(): |
| 266 | import traceback |
| 267 | sub_warning( |
| 268 | 'thread for sharing handles raised exception :\n' + |
| 269 | '-'*79 + '\n' + traceback.format_exc() + '-'*79 |
| 270 | ) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 271 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 272 | resource_sharer = ResourceSharer() |