Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 1 | # |
| 2 | # Module to allow connection and socket objects to be transferred |
| 3 | # between processes |
| 4 | # |
| 5 | # multiprocessing/reduction.py |
| 6 | # |
R. David Murray | 3fc969a | 2010-12-14 01:38:16 +0000 | [diff] [blame] | 7 | # Copyright (c) 2006-2008, R Oudkerk |
| 8 | # All rights reserved. |
| 9 | # |
| 10 | # Redistribution and use in source and binary forms, with or without |
| 11 | # modification, are permitted provided that the following conditions |
| 12 | # are met: |
| 13 | # |
| 14 | # 1. Redistributions of source code must retain the above copyright |
| 15 | # notice, this list of conditions and the following disclaimer. |
| 16 | # 2. Redistributions in binary form must reproduce the above copyright |
| 17 | # notice, this list of conditions and the following disclaimer in the |
| 18 | # documentation and/or other materials provided with the distribution. |
| 19 | # 3. Neither the name of author nor the names of any contributors may be |
| 20 | # used to endorse or promote products derived from this software |
| 21 | # without specific prior written permission. |
| 22 | # |
| 23 | # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND |
| 24 | # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| 25 | # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| 26 | # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE |
| 27 | # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
| 28 | # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS |
| 29 | # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) |
| 30 | # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
| 31 | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
| 32 | # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
| 33 | # SUCH DAMAGE. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 34 | # |
| 35 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 36 | __all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 37 | |
| 38 | import os |
| 39 | import sys |
| 40 | import socket |
| 41 | import threading |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 42 | import struct |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 43 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 44 | from multiprocessing import current_process |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 45 | from multiprocessing.util import register_after_fork, debug, sub_debug |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 46 | from multiprocessing.util import is_exiting, sub_warning |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 47 | |
| 48 | |
| 49 | # |
| 50 | # |
| 51 | # |
| 52 | |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 53 | if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and |
| 54 | hasattr(socket, 'SCM_RIGHTS'))): |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 55 | raise ImportError('pickling of connections not supported') |
| 56 | |
| 57 | # |
| 58 | # Platform specific definitions |
| 59 | # |
| 60 | |
| 61 | if sys.platform == 'win32': |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 62 | # Windows |
| 63 | __all__ += ['reduce_pipe_connection'] |
Antoine Pitrou | 23bba4c | 2012-04-18 20:51:15 +0200 | [diff] [blame] | 64 | import _winapi |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 65 | |
| 66 | def send_handle(conn, handle, destination_pid): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 67 | dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) |
| 68 | conn.send(dh) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 69 | |
| 70 | def recv_handle(conn): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 71 | return conn.recv().detach() |
| 72 | |
| 73 | class DupHandle(object): |
| 74 | def __init__(self, handle, access, pid=None): |
| 75 | # duplicate handle for process with given pid |
| 76 | if pid is None: |
| 77 | pid = os.getpid() |
| 78 | proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) |
| 79 | try: |
| 80 | self._handle = _winapi.DuplicateHandle( |
| 81 | _winapi.GetCurrentProcess(), |
| 82 | handle, proc, access, False, 0) |
| 83 | finally: |
| 84 | _winapi.CloseHandle(proc) |
| 85 | self._access = access |
| 86 | self._pid = pid |
| 87 | |
| 88 | def detach(self): |
| 89 | # retrieve handle from process which currently owns it |
| 90 | if self._pid == os.getpid(): |
| 91 | return self._handle |
| 92 | proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, |
| 93 | self._pid) |
| 94 | try: |
| 95 | return _winapi.DuplicateHandle( |
| 96 | proc, self._handle, _winapi.GetCurrentProcess(), |
| 97 | self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) |
| 98 | finally: |
| 99 | _winapi.CloseHandle(proc) |
| 100 | |
| 101 | class DupSocket(object): |
| 102 | def __init__(self, sock): |
| 103 | new_sock = sock.dup() |
| 104 | def send(conn, pid): |
| 105 | share = new_sock.share(pid) |
| 106 | conn.send_bytes(share) |
| 107 | self._id = resource_sharer.register(send, new_sock.close) |
| 108 | |
| 109 | def detach(self): |
| 110 | conn = resource_sharer.get_connection(self._id) |
| 111 | try: |
| 112 | share = conn.recv_bytes() |
| 113 | return socket.fromshare(share) |
| 114 | finally: |
| 115 | conn.close() |
| 116 | |
| 117 | def reduce_socket(s): |
| 118 | return rebuild_socket, (DupSocket(s),) |
| 119 | |
| 120 | def rebuild_socket(ds): |
| 121 | return ds.detach() |
| 122 | |
| 123 | def reduce_connection(conn): |
| 124 | handle = conn.fileno() |
| 125 | with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: |
| 126 | ds = DupSocket(s) |
| 127 | return rebuild_connection, (ds, conn.readable, conn.writable) |
| 128 | |
| 129 | def rebuild_connection(ds, readable, writable): |
| 130 | from .connection import Connection |
| 131 | sock = ds.detach() |
| 132 | return Connection(sock.detach(), readable, writable) |
| 133 | |
| 134 | def reduce_pipe_connection(conn): |
| 135 | access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | |
| 136 | (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) |
| 137 | dh = DupHandle(conn.fileno(), access) |
| 138 | return rebuild_pipe_connection, (dh, conn.readable, conn.writable) |
| 139 | |
| 140 | def rebuild_pipe_connection(dh, readable, writable): |
| 141 | from .connection import PipeConnection |
| 142 | handle = dh.detach() |
| 143 | return PipeConnection(handle, readable, writable) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 144 | |
| 145 | else: |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 146 | # Unix |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 147 | def send_handle(conn, handle, destination_pid): |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 148 | with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
| 149 | s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, |
| 150 | struct.pack("@i", handle))]) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 151 | |
| 152 | def recv_handle(conn): |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 153 | size = struct.calcsize("@i") |
| 154 | with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
| 155 | msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) |
| 156 | try: |
| 157 | cmsg_level, cmsg_type, cmsg_data = ancdata[0] |
| 158 | if (cmsg_level == socket.SOL_SOCKET and |
| 159 | cmsg_type == socket.SCM_RIGHTS): |
| 160 | return struct.unpack("@i", cmsg_data[:size])[0] |
| 161 | except (ValueError, IndexError, struct.error): |
| 162 | pass |
| 163 | raise RuntimeError('Invalid data received') |
| 164 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 165 | class DupFd(object): |
| 166 | def __init__(self, fd): |
| 167 | new_fd = os.dup(fd) |
| 168 | def send(conn, pid): |
| 169 | send_handle(conn, new_fd, pid) |
| 170 | def close(): |
| 171 | os.close(new_fd) |
| 172 | self._id = resource_sharer.register(send, close) |
| 173 | |
| 174 | def detach(self): |
| 175 | conn = resource_sharer.get_connection(self._id) |
| 176 | try: |
| 177 | return recv_handle(conn) |
| 178 | finally: |
| 179 | conn.close() |
| 180 | |
| 181 | def reduce_socket(s): |
| 182 | df = DupFd(s.fileno()) |
| 183 | return rebuild_socket, (df, s.family, s.type, s.proto) |
| 184 | |
| 185 | def rebuild_socket(df, family, type, proto): |
| 186 | fd = df.detach() |
| 187 | s = socket.fromfd(fd, family, type, proto) |
| 188 | os.close(fd) |
| 189 | return s |
| 190 | |
| 191 | def reduce_connection(conn): |
| 192 | df = DupFd(conn.fileno()) |
| 193 | return rebuild_connection, (df, conn.readable, conn.writable) |
| 194 | |
| 195 | def rebuild_connection(df, readable, writable): |
| 196 | from .connection import Connection |
| 197 | fd = df.detach() |
| 198 | return Connection(fd, readable, writable) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 199 | |
| 200 | # |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 201 | # Server which shares registered resources with clients |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 202 | # |
| 203 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 204 | class ResourceSharer(object): |
| 205 | def __init__(self): |
| 206 | self._key = 0 |
| 207 | self._cache = {} |
| 208 | self._old_locks = [] |
| 209 | self._lock = threading.Lock() |
| 210 | self._listener = None |
| 211 | self._address = None |
| 212 | register_after_fork(self, ResourceSharer._afterfork) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 213 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 214 | def register(self, send, close): |
| 215 | with self._lock: |
| 216 | if self._address is None: |
| 217 | self._start() |
| 218 | self._key += 1 |
| 219 | self._cache[self._key] = (send, close) |
| 220 | return (self._address, self._key) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 221 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 222 | @staticmethod |
| 223 | def get_connection(ident): |
| 224 | from .connection import Client |
| 225 | address, key = ident |
| 226 | c = Client(address, authkey=current_process().authkey) |
| 227 | c.send((key, os.getpid())) |
| 228 | return c |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 229 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 230 | def _afterfork(self): |
| 231 | for key, (send, close) in self._cache.items(): |
| 232 | close() |
| 233 | self._cache.clear() |
| 234 | # If self._lock was locked at the time of the fork, it may be broken |
| 235 | # -- see issue 6721. Replace it without letting it be gc'ed. |
| 236 | self._old_locks.append(self._lock) |
| 237 | self._lock = threading.Lock() |
| 238 | if self._listener is not None: |
| 239 | self._listener.close() |
| 240 | self._listener = None |
| 241 | self._address = None |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 242 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 243 | def _start(self): |
| 244 | from .connection import Listener |
| 245 | assert self._listener is None |
| 246 | debug('starting listener and thread for sending handles') |
| 247 | self._listener = Listener(authkey=current_process().authkey) |
| 248 | self._address = self._listener.address |
| 249 | t = threading.Thread(target=self._serve) |
| 250 | t.daemon = True |
| 251 | t.start() |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 252 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 253 | def _serve(self): |
| 254 | while 1: |
| 255 | try: |
| 256 | conn = self._listener.accept() |
| 257 | key, destination_pid = conn.recv() |
| 258 | send, close = self._cache.pop(key) |
| 259 | send(conn, destination_pid) |
| 260 | close() |
| 261 | conn.close() |
| 262 | except: |
| 263 | if not is_exiting(): |
| 264 | import traceback |
| 265 | sub_warning( |
| 266 | 'thread for sharing handles raised exception :\n' + |
| 267 | '-'*79 + '\n' + traceback.format_exc() + '-'*79 |
| 268 | ) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 269 | |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame^] | 270 | resource_sharer = ResourceSharer() |