| # | 
 | # Module which deals with pickling of objects. | 
 | # | 
 | # multiprocessing/reduction.py | 
 | # | 
 | # Copyright (c) 2006-2008, R Oudkerk | 
 | # Licensed to PSF under a Contributor Agreement. | 
 | # | 
 |  | 
 | from abc import ABCMeta | 
 | import copyreg | 
 | import functools | 
 | import io | 
 | import os | 
 | import pickle | 
 | import socket | 
 | import sys | 
 |  | 
 | from . import context | 
 |  | 
 | __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] | 
 |  | 
 |  | 
 | HAVE_SEND_HANDLE = (sys.platform == 'win32' or | 
 |                     (hasattr(socket, 'CMSG_LEN') and | 
 |                      hasattr(socket, 'SCM_RIGHTS') and | 
 |                      hasattr(socket.socket, 'sendmsg'))) | 
 |  | 
 | # | 
 | # Pickler subclass | 
 | # | 
 |  | 
 | class ForkingPickler(pickle.Pickler): | 
 |     '''Pickler subclass used by multiprocessing.''' | 
 |     _extra_reducers = {} | 
 |     _copyreg_dispatch_table = copyreg.dispatch_table | 
 |  | 
 |     def __init__(self, *args): | 
 |         super().__init__(*args) | 
 |         self.dispatch_table = self._copyreg_dispatch_table.copy() | 
 |         self.dispatch_table.update(self._extra_reducers) | 
 |  | 
 |     @classmethod | 
 |     def register(cls, type, reduce): | 
 |         '''Register a reduce function for a type.''' | 
 |         cls._extra_reducers[type] = reduce | 
 |  | 
 |     @classmethod | 
 |     def dumps(cls, obj, protocol=None): | 
 |         buf = io.BytesIO() | 
 |         cls(buf, protocol).dump(obj) | 
 |         return buf.getbuffer() | 
 |  | 
 |     loads = pickle.loads | 
 |  | 
 | register = ForkingPickler.register | 
 |  | 
 | def dump(obj, file, protocol=None): | 
 |     '''Replacement for pickle.dump() using ForkingPickler.''' | 
 |     ForkingPickler(file, protocol).dump(obj) | 
 |  | 
 | # | 
 | # Platform specific definitions | 
 | # | 
 |  | 
 | if sys.platform == 'win32': | 
 |     # Windows | 
 |     __all__ += ['DupHandle', 'duplicate', 'steal_handle'] | 
 |     import _winapi | 
 |  | 
 |     def duplicate(handle, target_process=None, inheritable=False, | 
 |                   *, source_process=None): | 
 |         '''Duplicate a handle.  (target_process is a handle not a pid!)''' | 
 |         current_process = _winapi.GetCurrentProcess() | 
 |         if source_process is None: | 
 |             source_process = current_process | 
 |         if target_process is None: | 
 |             target_process = current_process | 
 |         return _winapi.DuplicateHandle( | 
 |             source_process, handle, target_process, | 
 |             0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) | 
 |  | 
 |     def steal_handle(source_pid, handle): | 
 |         '''Steal a handle from process identified by source_pid.''' | 
 |         source_process_handle = _winapi.OpenProcess( | 
 |             _winapi.PROCESS_DUP_HANDLE, False, source_pid) | 
 |         try: | 
 |             return _winapi.DuplicateHandle( | 
 |                 source_process_handle, handle, | 
 |                 _winapi.GetCurrentProcess(), 0, False, | 
 |                 _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE) | 
 |         finally: | 
 |             _winapi.CloseHandle(source_process_handle) | 
 |  | 
 |     def send_handle(conn, handle, destination_pid): | 
 |         '''Send a handle over a local connection.''' | 
 |         dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) | 
 |         conn.send(dh) | 
 |  | 
 |     def recv_handle(conn): | 
 |         '''Receive a handle over a local connection.''' | 
 |         return conn.recv().detach() | 
 |  | 
 |     class DupHandle(object): | 
 |         '''Picklable wrapper for a handle.''' | 
 |         def __init__(self, handle, access, pid=None): | 
 |             if pid is None: | 
 |                 # We just duplicate the handle in the current process and | 
 |                 # let the receiving process steal the handle. | 
 |                 pid = os.getpid() | 
 |             proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) | 
 |             try: | 
 |                 self._handle = _winapi.DuplicateHandle( | 
 |                     _winapi.GetCurrentProcess(), | 
 |                     handle, proc, access, False, 0) | 
 |             finally: | 
 |                 _winapi.CloseHandle(proc) | 
 |             self._access = access | 
 |             self._pid = pid | 
 |  | 
 |         def detach(self): | 
 |             '''Get the handle.  This should only be called once.''' | 
 |             # retrieve handle from process which currently owns it | 
 |             if self._pid == os.getpid(): | 
 |                 # The handle has already been duplicated for this process. | 
 |                 return self._handle | 
 |             # We must steal the handle from the process whose pid is self._pid. | 
 |             proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, | 
 |                                        self._pid) | 
 |             try: | 
 |                 return _winapi.DuplicateHandle( | 
 |                     proc, self._handle, _winapi.GetCurrentProcess(), | 
 |                     self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) | 
 |             finally: | 
 |                 _winapi.CloseHandle(proc) | 
 |  | 
 | else: | 
 |     # Unix | 
 |     __all__ += ['DupFd', 'sendfds', 'recvfds'] | 
 |     import array | 
 |  | 
 |     # On MacOSX we should acknowledge receipt of fds -- see Issue14669 | 
 |     ACKNOWLEDGE = sys.platform == 'darwin' | 
 |  | 
 |     def sendfds(sock, fds): | 
 |         '''Send an array of fds over an AF_UNIX socket.''' | 
 |         fds = array.array('i', fds) | 
 |         msg = bytes([len(fds) % 256]) | 
 |         sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) | 
 |         if ACKNOWLEDGE and sock.recv(1) != b'A': | 
 |             raise RuntimeError('did not receive acknowledgement of fd') | 
 |  | 
 |     def recvfds(sock, size): | 
 |         '''Receive an array of fds over an AF_UNIX socket.''' | 
 |         a = array.array('i') | 
 |         bytes_size = a.itemsize * size | 
 |         msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size)) | 
 |         if not msg and not ancdata: | 
 |             raise EOFError | 
 |         try: | 
 |             if ACKNOWLEDGE: | 
 |                 sock.send(b'A') | 
 |             if len(ancdata) != 1: | 
 |                 raise RuntimeError('received %d items of ancdata' % | 
 |                                    len(ancdata)) | 
 |             cmsg_level, cmsg_type, cmsg_data = ancdata[0] | 
 |             if (cmsg_level == socket.SOL_SOCKET and | 
 |                 cmsg_type == socket.SCM_RIGHTS): | 
 |                 if len(cmsg_data) % a.itemsize != 0: | 
 |                     raise ValueError | 
 |                 a.frombytes(cmsg_data) | 
 |                 if len(a) % 256 != msg[0]: | 
 |                     raise AssertionError( | 
 |                         "Len is {0:n} but msg[0] is {1!r}".format( | 
 |                             len(a), msg[0])) | 
 |                 return list(a) | 
 |         except (ValueError, IndexError): | 
 |             pass | 
 |         raise RuntimeError('Invalid data received') | 
 |  | 
 |     def send_handle(conn, handle, destination_pid): | 
 |         '''Send a handle over a local connection.''' | 
 |         with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: | 
 |             sendfds(s, [handle]) | 
 |  | 
 |     def recv_handle(conn): | 
 |         '''Receive a handle over a local connection.''' | 
 |         with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: | 
 |             return recvfds(s, 1)[0] | 
 |  | 
 |     def DupFd(fd): | 
 |         '''Return a wrapper for an fd.''' | 
 |         popen_obj = context.get_spawning_popen() | 
 |         if popen_obj is not None: | 
 |             return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) | 
 |         elif HAVE_SEND_HANDLE: | 
 |             from . import resource_sharer | 
 |             return resource_sharer.DupFd(fd) | 
 |         else: | 
 |             raise ValueError('SCM_RIGHTS appears not to be available') | 
 |  | 
 | # | 
 | # Try making some callable types picklable | 
 | # | 
 |  | 
 | def _reduce_method(m): | 
 |     if m.__self__ is None: | 
 |         return getattr, (m.__class__, m.__func__.__name__) | 
 |     else: | 
 |         return getattr, (m.__self__, m.__func__.__name__) | 
 | class _C: | 
 |     def f(self): | 
 |         pass | 
 | register(type(_C().f), _reduce_method) | 
 |  | 
 |  | 
 | def _reduce_method_descriptor(m): | 
 |     return getattr, (m.__objclass__, m.__name__) | 
 | register(type(list.append), _reduce_method_descriptor) | 
 | register(type(int.__add__), _reduce_method_descriptor) | 
 |  | 
 |  | 
 | def _reduce_partial(p): | 
 |     return _rebuild_partial, (p.func, p.args, p.keywords or {}) | 
 | def _rebuild_partial(func, args, keywords): | 
 |     return functools.partial(func, *args, **keywords) | 
 | register(functools.partial, _reduce_partial) | 
 |  | 
 | # | 
 | # Make sockets picklable | 
 | # | 
 |  | 
 | if sys.platform == 'win32': | 
 |     def _reduce_socket(s): | 
 |         from .resource_sharer import DupSocket | 
 |         return _rebuild_socket, (DupSocket(s),) | 
 |     def _rebuild_socket(ds): | 
 |         return ds.detach() | 
 |     register(socket.socket, _reduce_socket) | 
 |  | 
 | else: | 
 |     def _reduce_socket(s): | 
 |         df = DupFd(s.fileno()) | 
 |         return _rebuild_socket, (df, s.family, s.type, s.proto) | 
 |     def _rebuild_socket(df, family, type, proto): | 
 |         fd = df.detach() | 
 |         return socket.socket(family, type, proto, fileno=fd) | 
 |     register(socket.socket, _reduce_socket) | 
 |  | 
 |  | 
 | class AbstractReducer(metaclass=ABCMeta): | 
 |     '''Abstract base class for use in implementing a Reduction class | 
 |     suitable for use in replacing the standard reduction mechanism | 
 |     used in multiprocessing.''' | 
 |     ForkingPickler = ForkingPickler | 
 |     register = register | 
 |     dump = dump | 
 |     send_handle = send_handle | 
 |     recv_handle = recv_handle | 
 |  | 
 |     if sys.platform == 'win32': | 
 |         steal_handle = steal_handle | 
 |         duplicate = duplicate | 
 |         DupHandle = DupHandle | 
 |     else: | 
 |         sendfds = sendfds | 
 |         recvfds = recvfds | 
 |         DupFd = DupFd | 
 |  | 
 |     _reduce_method = _reduce_method | 
 |     _reduce_method_descriptor = _reduce_method_descriptor | 
 |     _rebuild_partial = _rebuild_partial | 
 |     _reduce_socket = _reduce_socket | 
 |     _rebuild_socket = _rebuild_socket | 
 |  | 
 |     def __init__(self, *args): | 
 |         register(type(_C().f), _reduce_method) | 
 |         register(type(list.append), _reduce_method_descriptor) | 
 |         register(type(int.__add__), _reduce_method_descriptor) | 
 |         register(functools.partial, _reduce_partial) | 
 |         register(socket.socket, _reduce_socket) |