Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 1 | # |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 2 | # Module which deals with pickling of objects. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 3 | # |
| 4 | # multiprocessing/reduction.py |
| 5 | # |
R. David Murray | 3fc969a | 2010-12-14 01:38:16 +0000 | [diff] [blame] | 6 | # Copyright (c) 2006-2008, R Oudkerk |
Richard Oudkerk | 3e268aa | 2012-04-30 12:13:55 +0100 | [diff] [blame] | 7 | # Licensed to PSF under a Contributor Agreement. |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 8 | # |
| 9 | |
Victor Stinner | d6debb2 | 2017-03-27 16:05:26 +0200 | [diff] [blame] | 10 | from abc import ABCMeta |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 11 | import copyreg |
| 12 | import functools |
| 13 | import io |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 14 | import os |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 15 | import pickle |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 16 | import socket |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 17 | import sys |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 18 | |
Richard Oudkerk | b1694cf | 2013-10-16 16:41:56 +0100 | [diff] [blame] | 19 | from . import context |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 20 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 21 | __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] |
| 22 | |
| 23 | |
| 24 | HAVE_SEND_HANDLE = (sys.platform == 'win32' or |
| 25 | (hasattr(socket, 'CMSG_LEN') and |
| 26 | hasattr(socket, 'SCM_RIGHTS') and |
| 27 | hasattr(socket.socket, 'sendmsg'))) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 28 | |
| 29 | # |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 30 | # Pickler subclass |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 31 | # |
| 32 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 33 | class ForkingPickler(pickle.Pickler): |
| 34 | '''Pickler subclass used by multiprocessing.''' |
| 35 | _extra_reducers = {} |
| 36 | _copyreg_dispatch_table = copyreg.dispatch_table |
| 37 | |
| 38 | def __init__(self, *args): |
| 39 | super().__init__(*args) |
| 40 | self.dispatch_table = self._copyreg_dispatch_table.copy() |
| 41 | self.dispatch_table.update(self._extra_reducers) |
| 42 | |
| 43 | @classmethod |
| 44 | def register(cls, type, reduce): |
| 45 | '''Register a reduce function for a type.''' |
| 46 | cls._extra_reducers[type] = reduce |
| 47 | |
| 48 | @classmethod |
| 49 | def dumps(cls, obj, protocol=None): |
| 50 | buf = io.BytesIO() |
| 51 | cls(buf, protocol).dump(obj) |
| 52 | return buf.getbuffer() |
| 53 | |
| 54 | loads = pickle.loads |
| 55 | |
| 56 | register = ForkingPickler.register |
| 57 | |
| 58 | def dump(obj, file, protocol=None): |
| 59 | '''Replacement for pickle.dump() using ForkingPickler.''' |
| 60 | ForkingPickler(file, protocol).dump(obj) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 61 | |
| 62 | # |
| 63 | # Platform specific definitions |
| 64 | # |
| 65 | |
| 66 | if sys.platform == 'win32': |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 67 | # Windows |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 68 | __all__ += ['DupHandle', 'duplicate', 'steal_handle'] |
Antoine Pitrou | 23bba4c | 2012-04-18 20:51:15 +0200 | [diff] [blame] | 69 | import _winapi |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 70 | |
Victor Stinner | 2cc9d21 | 2018-06-27 11:40:24 +0200 | [diff] [blame] | 71 | def duplicate(handle, target_process=None, inheritable=False, |
| 72 | *, source_process=None): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 73 | '''Duplicate a handle. (target_process is a handle not a pid!)''' |
Victor Stinner | 2cc9d21 | 2018-06-27 11:40:24 +0200 | [diff] [blame] | 74 | current_process = _winapi.GetCurrentProcess() |
| 75 | if source_process is None: |
| 76 | source_process = current_process |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 77 | if target_process is None: |
Victor Stinner | 2cc9d21 | 2018-06-27 11:40:24 +0200 | [diff] [blame] | 78 | target_process = current_process |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 79 | return _winapi.DuplicateHandle( |
Victor Stinner | 2cc9d21 | 2018-06-27 11:40:24 +0200 | [diff] [blame] | 80 | source_process, handle, target_process, |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 81 | 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) |
| 82 | |
| 83 | def steal_handle(source_pid, handle): |
| 84 | '''Steal a handle from process identified by source_pid.''' |
| 85 | source_process_handle = _winapi.OpenProcess( |
| 86 | _winapi.PROCESS_DUP_HANDLE, False, source_pid) |
| 87 | try: |
| 88 | return _winapi.DuplicateHandle( |
| 89 | source_process_handle, handle, |
| 90 | _winapi.GetCurrentProcess(), 0, False, |
| 91 | _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE) |
| 92 | finally: |
| 93 | _winapi.CloseHandle(source_process_handle) |
| 94 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 95 | def send_handle(conn, handle, destination_pid): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 96 | '''Send a handle over a local connection.''' |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 97 | dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) |
| 98 | conn.send(dh) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 99 | |
| 100 | def recv_handle(conn): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 101 | '''Receive a handle over a local connection.''' |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 102 | return conn.recv().detach() |
| 103 | |
| 104 | class DupHandle(object): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 105 | '''Picklable wrapper for a handle.''' |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 106 | def __init__(self, handle, access, pid=None): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 107 | if pid is None: |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 108 | # We just duplicate the handle in the current process and |
| 109 | # let the receiving process steal the handle. |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 110 | pid = os.getpid() |
| 111 | proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) |
| 112 | try: |
| 113 | self._handle = _winapi.DuplicateHandle( |
| 114 | _winapi.GetCurrentProcess(), |
| 115 | handle, proc, access, False, 0) |
| 116 | finally: |
| 117 | _winapi.CloseHandle(proc) |
| 118 | self._access = access |
| 119 | self._pid = pid |
| 120 | |
| 121 | def detach(self): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 122 | '''Get the handle. This should only be called once.''' |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 123 | # retrieve handle from process which currently owns it |
| 124 | if self._pid == os.getpid(): |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 125 | # The handle has already been duplicated for this process. |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 126 | return self._handle |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 127 | # We must steal the handle from the process whose pid is self._pid. |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 128 | proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, |
| 129 | self._pid) |
| 130 | try: |
| 131 | return _winapi.DuplicateHandle( |
| 132 | proc, self._handle, _winapi.GetCurrentProcess(), |
| 133 | self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) |
| 134 | finally: |
| 135 | _winapi.CloseHandle(proc) |
| 136 | |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 137 | else: |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 138 | # Unix |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 139 | __all__ += ['DupFd', 'sendfds', 'recvfds'] |
| 140 | import array |
Richard Oudkerk | 04ec8ce | 2012-08-16 16:48:55 +0100 | [diff] [blame] | 141 | |
| 142 | # On MacOSX we should acknowledge receipt of fds -- see Issue14669 |
| 143 | ACKNOWLEDGE = sys.platform == 'darwin' |
| 144 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 145 | def sendfds(sock, fds): |
| 146 | '''Send an array of fds over an AF_UNIX socket.''' |
| 147 | fds = array.array('i', fds) |
| 148 | msg = bytes([len(fds) % 256]) |
| 149 | sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) |
| 150 | if ACKNOWLEDGE and sock.recv(1) != b'A': |
Richard Oudkerk | 04ec8ce | 2012-08-16 16:48:55 +0100 | [diff] [blame] | 151 | raise RuntimeError('did not receive acknowledgement of fd') |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 152 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 153 | def recvfds(sock, size): |
| 154 | '''Receive an array of fds over an AF_UNIX socket.''' |
| 155 | a = array.array('i') |
| 156 | bytes_size = a.itemsize * size |
Pablo Galindo | 077061a | 2018-09-28 10:51:05 +0100 | [diff] [blame] | 157 | msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size)) |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 158 | if not msg and not ancdata: |
| 159 | raise EOFError |
| 160 | try: |
| 161 | if ACKNOWLEDGE: |
| 162 | sock.send(b'A') |
| 163 | if len(ancdata) != 1: |
| 164 | raise RuntimeError('received %d items of ancdata' % |
| 165 | len(ancdata)) |
| 166 | cmsg_level, cmsg_type, cmsg_data = ancdata[0] |
| 167 | if (cmsg_level == socket.SOL_SOCKET and |
| 168 | cmsg_type == socket.SCM_RIGHTS): |
| 169 | if len(cmsg_data) % a.itemsize != 0: |
| 170 | raise ValueError |
| 171 | a.frombytes(cmsg_data) |
Allen W. Smith, Ph.D | bd73e72 | 2017-08-29 17:52:18 -0500 | [diff] [blame] | 172 | if len(a) % 256 != msg[0]: |
| 173 | raise AssertionError( |
| 174 | "Len is {0:n} but msg[0] is {1!r}".format( |
| 175 | len(a), msg[0])) |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 176 | return list(a) |
| 177 | except (ValueError, IndexError): |
| 178 | pass |
| 179 | raise RuntimeError('Invalid data received') |
| 180 | |
| 181 | def send_handle(conn, handle, destination_pid): |
| 182 | '''Send a handle over a local connection.''' |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 183 | with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 184 | sendfds(s, [handle]) |
Charles-François Natali | dc863dd | 2011-09-24 20:04:29 +0200 | [diff] [blame] | 185 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 186 | def recv_handle(conn): |
| 187 | '''Receive a handle over a local connection.''' |
| 188 | with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
| 189 | return recvfds(s, 1)[0] |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 190 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 191 | def DupFd(fd): |
| 192 | '''Return a wrapper for an fd.''' |
Richard Oudkerk | b1694cf | 2013-10-16 16:41:56 +0100 | [diff] [blame] | 193 | popen_obj = context.get_spawning_popen() |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 194 | if popen_obj is not None: |
| 195 | return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) |
| 196 | elif HAVE_SEND_HANDLE: |
| 197 | from . import resource_sharer |
| 198 | return resource_sharer.DupFd(fd) |
| 199 | else: |
| 200 | raise ValueError('SCM_RIGHTS appears not to be available') |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 201 | |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 202 | # |
| 203 | # Try making some callable types picklable |
| 204 | # |
| 205 | |
| 206 | def _reduce_method(m): |
| 207 | if m.__self__ is None: |
| 208 | return getattr, (m.__class__, m.__func__.__name__) |
| 209 | else: |
| 210 | return getattr, (m.__self__, m.__func__.__name__) |
| 211 | class _C: |
| 212 | def f(self): |
| 213 | pass |
| 214 | register(type(_C().f), _reduce_method) |
| 215 | |
| 216 | |
| 217 | def _reduce_method_descriptor(m): |
| 218 | return getattr, (m.__objclass__, m.__name__) |
| 219 | register(type(list.append), _reduce_method_descriptor) |
| 220 | register(type(int.__add__), _reduce_method_descriptor) |
| 221 | |
| 222 | |
| 223 | def _reduce_partial(p): |
| 224 | return _rebuild_partial, (p.func, p.args, p.keywords or {}) |
| 225 | def _rebuild_partial(func, args, keywords): |
| 226 | return functools.partial(func, *args, **keywords) |
| 227 | register(functools.partial, _reduce_partial) |
| 228 | |
| 229 | # |
| 230 | # Make sockets picklable |
| 231 | # |
| 232 | |
| 233 | if sys.platform == 'win32': |
| 234 | def _reduce_socket(s): |
| 235 | from .resource_sharer import DupSocket |
| 236 | return _rebuild_socket, (DupSocket(s),) |
| 237 | def _rebuild_socket(ds): |
| 238 | return ds.detach() |
| 239 | register(socket.socket, _reduce_socket) |
| 240 | |
| 241 | else: |
| 242 | def _reduce_socket(s): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 243 | df = DupFd(s.fileno()) |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 244 | return _rebuild_socket, (df, s.family, s.type, s.proto) |
| 245 | def _rebuild_socket(df, family, type, proto): |
Antoine Pitrou | 5438ed1 | 2012-04-24 22:56:57 +0200 | [diff] [blame] | 246 | fd = df.detach() |
Richard Oudkerk | 84ed9a6 | 2013-08-14 15:35:41 +0100 | [diff] [blame] | 247 | return socket.socket(family, type, proto, fileno=fd) |
| 248 | register(socket.socket, _reduce_socket) |
Davin Potts | 5458647 | 2016-09-09 18:03:10 -0500 | [diff] [blame] | 249 | |
| 250 | |
| 251 | class AbstractReducer(metaclass=ABCMeta): |
| 252 | '''Abstract base class for use in implementing a Reduction class |
| 253 | suitable for use in replacing the standard reduction mechanism |
| 254 | used in multiprocessing.''' |
| 255 | ForkingPickler = ForkingPickler |
| 256 | register = register |
| 257 | dump = dump |
| 258 | send_handle = send_handle |
| 259 | recv_handle = recv_handle |
| 260 | |
| 261 | if sys.platform == 'win32': |
| 262 | steal_handle = steal_handle |
| 263 | duplicate = duplicate |
| 264 | DupHandle = DupHandle |
| 265 | else: |
| 266 | sendfds = sendfds |
| 267 | recvfds = recvfds |
| 268 | DupFd = DupFd |
| 269 | |
| 270 | _reduce_method = _reduce_method |
| 271 | _reduce_method_descriptor = _reduce_method_descriptor |
| 272 | _rebuild_partial = _rebuild_partial |
| 273 | _reduce_socket = _reduce_socket |
| 274 | _rebuild_socket = _rebuild_socket |
| 275 | |
| 276 | def __init__(self, *args): |
| 277 | register(type(_C().f), _reduce_method) |
| 278 | register(type(list.append), _reduce_method_descriptor) |
| 279 | register(type(int.__add__), _reduce_method_descriptor) |
| 280 | register(functools.partial, _reduce_partial) |
| 281 | register(socket.socket, _reduce_socket) |