Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 1 | # |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 2 | # Module which deals with pickling of objects. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 3 | # |
| 4 | # multiprocessing/reduction.py |
| 5 | # |
R. David Murray | 3fc969a | 2010-12-14 01:38:16 +0000 | [diff] [blame] | 6 | # Copyright (c) 2006-2008, R Oudkerk |
Richard Oudkerk | 3e268aa | 2012-04-30 12:13:55 +0100 | [diff] [blame] | 7 | # Licensed to PSF under a Contributor Agreement. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 8 | # |
| 9 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 10 | import copyreg |
| 11 | import functools |
| 12 | import io |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 13 | import os |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 14 | import pickle |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 15 | import socket |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 16 | import sys |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 17 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 18 | from . import popen |
| 19 | from . import util |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 20 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 21 | __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] |
| 22 | |
| 23 | |
| 24 | HAVE_SEND_HANDLE = (sys.platform == 'win32' or |
| 25 | (hasattr(socket, 'CMSG_LEN') and |
| 26 | hasattr(socket, 'SCM_RIGHTS') and |
| 27 | hasattr(socket.socket, 'sendmsg'))) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 28 | |
| 29 | # |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 30 | # Pickler subclass |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 31 | # |
| 32 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 33 | class ForkingPickler(pickle.Pickler): |
| 34 | '''Pickler subclass used by multiprocessing.''' |
| 35 | _extra_reducers = {} |
| 36 | _copyreg_dispatch_table = copyreg.dispatch_table |
| 37 | |
| 38 | def __init__(self, *args): |
| 39 | super().__init__(*args) |
| 40 | self.dispatch_table = self._copyreg_dispatch_table.copy() |
| 41 | self.dispatch_table.update(self._extra_reducers) |
| 42 | |
| 43 | @classmethod |
| 44 | def register(cls, type, reduce): |
| 45 | '''Register a reduce function for a type.''' |
| 46 | cls._extra_reducers[type] = reduce |
| 47 | |
| 48 | @classmethod |
| 49 | def dumps(cls, obj, protocol=None): |
| 50 | buf = io.BytesIO() |
| 51 | cls(buf, protocol).dump(obj) |
| 52 | return buf.getbuffer() |
| 53 | |
| 54 | loads = pickle.loads |
| 55 | |
| 56 | register = ForkingPickler.register |
| 57 | |
| 58 | def dump(obj, file, protocol=None): |
| 59 | '''Replacement for pickle.dump() using ForkingPickler.''' |
| 60 | ForkingPickler(file, protocol).dump(obj) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 61 | |
| 62 | # |
| 63 | # Platform specific definitions |
| 64 | # |
| 65 | |
| 66 | if sys.platform == 'win32': |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 67 | # Windows |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 68 | __all__ += ['DupHandle', 'duplicate', 'steal_handle'] |
Antoine Pitrou | 23bba4c | 2012-04-18 20:51:15 +0200 | [diff] [blame] | 69 | import _winapi |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 70 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 71 | def duplicate(handle, target_process=None, inheritable=False): |
| 72 | '''Duplicate a handle. (target_process is a handle not a pid!)''' |
| 73 | if target_process is None: |
| 74 | target_process = _winapi.GetCurrentProcess() |
| 75 | return _winapi.DuplicateHandle( |
| 76 | _winapi.GetCurrentProcess(), handle, target_process, |
| 77 | 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) |
| 78 | |
| 79 | def steal_handle(source_pid, handle): |
| 80 | '''Steal a handle from process identified by source_pid.''' |
| 81 | source_process_handle = _winapi.OpenProcess( |
| 82 | _winapi.PROCESS_DUP_HANDLE, False, source_pid) |
| 83 | try: |
| 84 | return _winapi.DuplicateHandle( |
| 85 | source_process_handle, handle, |
| 86 | _winapi.GetCurrentProcess(), 0, False, |
| 87 | _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE) |
| 88 | finally: |
| 89 | _winapi.CloseHandle(source_process_handle) |
| 90 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 91 | def send_handle(conn, handle, destination_pid): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 92 | '''Send a handle over a local connection.''' |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 93 | dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) |
| 94 | conn.send(dh) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 95 | |
| 96 | def recv_handle(conn): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 97 | '''Receive a handle over a local connection.''' |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 98 | return conn.recv().detach() |
| 99 | |
| 100 | class DupHandle(object): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 101 | '''Picklable wrapper for a handle.''' |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 102 | def __init__(self, handle, access, pid=None): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 103 | if pid is None: |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 104 | # We just duplicate the handle in the current process and |
| 105 | # let the receiving process steal the handle. |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 106 | pid = os.getpid() |
| 107 | proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) |
| 108 | try: |
| 109 | self._handle = _winapi.DuplicateHandle( |
| 110 | _winapi.GetCurrentProcess(), |
| 111 | handle, proc, access, False, 0) |
| 112 | finally: |
| 113 | _winapi.CloseHandle(proc) |
| 114 | self._access = access |
| 115 | self._pid = pid |
| 116 | |
| 117 | def detach(self): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 118 | '''Get the handle. This should only be called once.''' |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 119 | # retrieve handle from process which currently owns it |
| 120 | if self._pid == os.getpid(): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 121 | # The handle has already been duplicated for this process. |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 122 | return self._handle |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 123 | # We must steal the handle from the process whose pid is self._pid. |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 124 | proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, |
| 125 | self._pid) |
| 126 | try: |
| 127 | return _winapi.DuplicateHandle( |
| 128 | proc, self._handle, _winapi.GetCurrentProcess(), |
| 129 | self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) |
| 130 | finally: |
| 131 | _winapi.CloseHandle(proc) |
| 132 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 133 | else: |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 134 | # Unix |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 135 | __all__ += ['DupFd', 'sendfds', 'recvfds'] |
| 136 | import array |
Richard Oudkerk | 04ec8ce | 2012-08-16 16:48:55 +0100 | [diff] [blame] | 137 | |
| 138 | # On MacOSX we should acknowledge receipt of fds -- see Issue14669 |
| 139 | ACKNOWLEDGE = sys.platform == 'darwin' |
| 140 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 141 | def sendfds(sock, fds): |
| 142 | '''Send an array of fds over an AF_UNIX socket.''' |
| 143 | fds = array.array('i', fds) |
| 144 | msg = bytes([len(fds) % 256]) |
| 145 | sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) |
| 146 | if ACKNOWLEDGE and sock.recv(1) != b'A': |
Richard Oudkerk | 04ec8ce | 2012-08-16 16:48:55 +0100 | [diff] [blame] | 147 | raise RuntimeError('did not receive acknowledgement of fd') |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 148 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 149 | def recvfds(sock, size): |
| 150 | '''Receive an array of fds over an AF_UNIX socket.''' |
| 151 | a = array.array('i') |
| 152 | bytes_size = a.itemsize * size |
| 153 | msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) |
| 154 | if not msg and not ancdata: |
| 155 | raise EOFError |
| 156 | try: |
| 157 | if ACKNOWLEDGE: |
| 158 | sock.send(b'A') |
| 159 | if len(ancdata) != 1: |
| 160 | raise RuntimeError('received %d items of ancdata' % |
| 161 | len(ancdata)) |
| 162 | cmsg_level, cmsg_type, cmsg_data = ancdata[0] |
| 163 | if (cmsg_level == socket.SOL_SOCKET and |
| 164 | cmsg_type == socket.SCM_RIGHTS): |
| 165 | if len(cmsg_data) % a.itemsize != 0: |
| 166 | raise ValueError |
| 167 | a.frombytes(cmsg_data) |
| 168 | assert len(a) % 256 == msg[0] |
| 169 | return list(a) |
| 170 | except (ValueError, IndexError): |
| 171 | pass |
| 172 | raise RuntimeError('Invalid data received') |
| 173 | |
| 174 | def send_handle(conn, handle, destination_pid): |
| 175 | '''Send a handle over a local connection.''' |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 176 | with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 177 | sendfds(s, [handle]) |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 178 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 179 | def recv_handle(conn): |
| 180 | '''Receive a handle over a local connection.''' |
| 181 | with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
| 182 | return recvfds(s, 1)[0] |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 183 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 184 | def DupFd(fd): |
| 185 | '''Return a wrapper for an fd.''' |
| 186 | popen_obj = popen.get_spawning_popen() |
| 187 | if popen_obj is not None: |
| 188 | return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) |
| 189 | elif HAVE_SEND_HANDLE: |
| 190 | from . import resource_sharer |
| 191 | return resource_sharer.DupFd(fd) |
| 192 | else: |
| 193 | raise ValueError('SCM_RIGHTS appears not to be available') |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 194 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 195 | # |
| 196 | # Try making some callable types picklable |
| 197 | # |
| 198 | |
| 199 | def _reduce_method(m): |
| 200 | if m.__self__ is None: |
| 201 | return getattr, (m.__class__, m.__func__.__name__) |
| 202 | else: |
| 203 | return getattr, (m.__self__, m.__func__.__name__) |
| 204 | class _C: |
| 205 | def f(self): |
| 206 | pass |
| 207 | register(type(_C().f), _reduce_method) |
| 208 | |
| 209 | |
| 210 | def _reduce_method_descriptor(m): |
| 211 | return getattr, (m.__objclass__, m.__name__) |
| 212 | register(type(list.append), _reduce_method_descriptor) |
| 213 | register(type(int.__add__), _reduce_method_descriptor) |
| 214 | |
| 215 | |
| 216 | def _reduce_partial(p): |
| 217 | return _rebuild_partial, (p.func, p.args, p.keywords or {}) |
| 218 | def _rebuild_partial(func, args, keywords): |
| 219 | return functools.partial(func, *args, **keywords) |
| 220 | register(functools.partial, _reduce_partial) |
| 221 | |
| 222 | # |
| 223 | # Make sockets picklable |
| 224 | # |
| 225 | |
| 226 | if sys.platform == 'win32': |
| 227 | def _reduce_socket(s): |
| 228 | from .resource_sharer import DupSocket |
| 229 | return _rebuild_socket, (DupSocket(s),) |
| 230 | def _rebuild_socket(ds): |
| 231 | return ds.detach() |
| 232 | register(socket.socket, _reduce_socket) |
| 233 | |
| 234 | else: |
| 235 | def _reduce_socket(s): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 236 | df = DupFd(s.fileno()) |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 237 | return _rebuild_socket, (df, s.family, s.type, s.proto) |
| 238 | def _rebuild_socket(df, family, type, proto): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 239 | fd = df.detach() |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame^] | 240 | return socket.socket(family, type, proto, fileno=fd) |
| 241 | register(socket.socket, _reduce_socket) |