| # |
| # Module which deals with pickling of objects. |
| # |
| # multiprocessing/reduction.py |
| # |
| # Copyright (c) 2006-2008, R Oudkerk |
| # Licensed to PSF under a Contributor Agreement. |
| # |
| |
| import copyreg |
| import functools |
| import io |
| import os |
| import pickle |
| import socket |
| import sys |
| |
| from . import popen |
| from . import util |
| |
| __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] |
| |
| |
| HAVE_SEND_HANDLE = (sys.platform == 'win32' or |
| (hasattr(socket, 'CMSG_LEN') and |
| hasattr(socket, 'SCM_RIGHTS') and |
| hasattr(socket.socket, 'sendmsg'))) |
| |
| # |
| # Pickler subclass |
| # |
| |
| class ForkingPickler(pickle.Pickler): |
| '''Pickler subclass used by multiprocessing.''' |
| _extra_reducers = {} |
| _copyreg_dispatch_table = copyreg.dispatch_table |
| |
| def __init__(self, *args): |
| super().__init__(*args) |
| self.dispatch_table = self._copyreg_dispatch_table.copy() |
| self.dispatch_table.update(self._extra_reducers) |
| |
| @classmethod |
| def register(cls, type, reduce): |
| '''Register a reduce function for a type.''' |
| cls._extra_reducers[type] = reduce |
| |
| @classmethod |
| def dumps(cls, obj, protocol=None): |
| buf = io.BytesIO() |
| cls(buf, protocol).dump(obj) |
| return buf.getbuffer() |
| |
| loads = pickle.loads |
| |
| register = ForkingPickler.register |
| |
| def dump(obj, file, protocol=None): |
| '''Replacement for pickle.dump() using ForkingPickler.''' |
| ForkingPickler(file, protocol).dump(obj) |
| |
| # |
| # Platform specific definitions |
| # |
| |
| if sys.platform == 'win32': |
| # Windows |
| __all__ += ['DupHandle', 'duplicate', 'steal_handle'] |
| import _winapi |
| |
| def duplicate(handle, target_process=None, inheritable=False): |
| '''Duplicate a handle. (target_process is a handle not a pid!)''' |
| if target_process is None: |
| target_process = _winapi.GetCurrentProcess() |
| return _winapi.DuplicateHandle( |
| _winapi.GetCurrentProcess(), handle, target_process, |
| 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) |
| |
| def steal_handle(source_pid, handle): |
| '''Steal a handle from process identified by source_pid.''' |
| source_process_handle = _winapi.OpenProcess( |
| _winapi.PROCESS_DUP_HANDLE, False, source_pid) |
| try: |
| return _winapi.DuplicateHandle( |
| source_process_handle, handle, |
| _winapi.GetCurrentProcess(), 0, False, |
| _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE) |
| finally: |
| _winapi.CloseHandle(source_process_handle) |
| |
| def send_handle(conn, handle, destination_pid): |
| '''Send a handle over a local connection.''' |
| dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) |
| conn.send(dh) |
| |
| def recv_handle(conn): |
| '''Receive a handle over a local connection.''' |
| return conn.recv().detach() |
| |
| class DupHandle(object): |
| '''Picklable wrapper for a handle.''' |
| def __init__(self, handle, access, pid=None): |
| if pid is None: |
| # We just duplicate the handle in the current process and |
| # let the receiving process steal the handle. |
| pid = os.getpid() |
| proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) |
| try: |
| self._handle = _winapi.DuplicateHandle( |
| _winapi.GetCurrentProcess(), |
| handle, proc, access, False, 0) |
| finally: |
| _winapi.CloseHandle(proc) |
| self._access = access |
| self._pid = pid |
| |
| def detach(self): |
| '''Get the handle. This should only be called once.''' |
| # retrieve handle from process which currently owns it |
| if self._pid == os.getpid(): |
| # The handle has already been duplicated for this process. |
| return self._handle |
| # We must steal the handle from the process whose pid is self._pid. |
| proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, |
| self._pid) |
| try: |
| return _winapi.DuplicateHandle( |
| proc, self._handle, _winapi.GetCurrentProcess(), |
| self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) |
| finally: |
| _winapi.CloseHandle(proc) |
| |
| else: |
| # Unix |
| __all__ += ['DupFd', 'sendfds', 'recvfds'] |
| import array |
| |
| # On MacOSX we should acknowledge receipt of fds -- see Issue14669 |
| ACKNOWLEDGE = sys.platform == 'darwin' |
| |
| def sendfds(sock, fds): |
| '''Send an array of fds over an AF_UNIX socket.''' |
| fds = array.array('i', fds) |
| msg = bytes([len(fds) % 256]) |
| sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) |
| if ACKNOWLEDGE and sock.recv(1) != b'A': |
| raise RuntimeError('did not receive acknowledgement of fd') |
| |
| def recvfds(sock, size): |
| '''Receive an array of fds over an AF_UNIX socket.''' |
| a = array.array('i') |
| bytes_size = a.itemsize * size |
| msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) |
| if not msg and not ancdata: |
| raise EOFError |
| try: |
| if ACKNOWLEDGE: |
| sock.send(b'A') |
| if len(ancdata) != 1: |
| raise RuntimeError('received %d items of ancdata' % |
| len(ancdata)) |
| cmsg_level, cmsg_type, cmsg_data = ancdata[0] |
| if (cmsg_level == socket.SOL_SOCKET and |
| cmsg_type == socket.SCM_RIGHTS): |
| if len(cmsg_data) % a.itemsize != 0: |
| raise ValueError |
| a.frombytes(cmsg_data) |
| assert len(a) % 256 == msg[0] |
| return list(a) |
| except (ValueError, IndexError): |
| pass |
| raise RuntimeError('Invalid data received') |
| |
| def send_handle(conn, handle, destination_pid): |
| '''Send a handle over a local connection.''' |
| with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
| sendfds(s, [handle]) |
| |
| def recv_handle(conn): |
| '''Receive a handle over a local connection.''' |
| with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: |
| return recvfds(s, 1)[0] |
| |
| def DupFd(fd): |
| '''Return a wrapper for an fd.''' |
| popen_obj = popen.get_spawning_popen() |
| if popen_obj is not None: |
| return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) |
| elif HAVE_SEND_HANDLE: |
| from . import resource_sharer |
| return resource_sharer.DupFd(fd) |
| else: |
| raise ValueError('SCM_RIGHTS appears not to be available') |
| |
| # |
| # Try making some callable types picklable |
| # |
| |
| def _reduce_method(m): |
| if m.__self__ is None: |
| return getattr, (m.__class__, m.__func__.__name__) |
| else: |
| return getattr, (m.__self__, m.__func__.__name__) |
| class _C: |
| def f(self): |
| pass |
| register(type(_C().f), _reduce_method) |
| |
| |
| def _reduce_method_descriptor(m): |
| return getattr, (m.__objclass__, m.__name__) |
| register(type(list.append), _reduce_method_descriptor) |
| register(type(int.__add__), _reduce_method_descriptor) |
| |
| |
| def _reduce_partial(p): |
| return _rebuild_partial, (p.func, p.args, p.keywords or {}) |
| def _rebuild_partial(func, args, keywords): |
| return functools.partial(func, *args, **keywords) |
| register(functools.partial, _reduce_partial) |
| |
| # |
| # Make sockets picklable |
| # |
| |
| if sys.platform == 'win32': |
| def _reduce_socket(s): |
| from .resource_sharer import DupSocket |
| return _rebuild_socket, (DupSocket(s),) |
| def _rebuild_socket(ds): |
| return ds.detach() |
| register(socket.socket, _reduce_socket) |
| |
| else: |
| def _reduce_socket(s): |
| df = DupFd(s.fileno()) |
| return _rebuild_socket, (df, s.family, s.type, s.proto) |
| def _rebuild_socket(df, family, type, proto): |
| fd = df.detach() |
| return socket.socket(family, type, proto, fileno=fd) |
| register(socket.socket, _reduce_socket) |