blob: deca19ccad7b7f236304db5237ab90d032ae0ffc [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002# Module which deals with pickling of objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# multiprocessing/reduction.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
Victor Stinnerd6debb22017-03-27 16:05:26 +020010from abc import ABCMeta
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010011import copyreg
12import functools
13import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000014import os
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010015import pickle
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010017import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000018
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010019from . import context
Benjamin Petersone711caf2008-06-11 16:44:04 +000020
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010021__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
22
23
24HAVE_SEND_HANDLE = (sys.platform == 'win32' or
25 (hasattr(socket, 'CMSG_LEN') and
26 hasattr(socket, 'SCM_RIGHTS') and
27 hasattr(socket.socket, 'sendmsg')))
Benjamin Petersone711caf2008-06-11 16:44:04 +000028
29#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010030# Pickler subclass
Benjamin Petersone711caf2008-06-11 16:44:04 +000031#
32
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010033class ForkingPickler(pickle.Pickler):
34 '''Pickler subclass used by multiprocessing.'''
35 _extra_reducers = {}
36 _copyreg_dispatch_table = copyreg.dispatch_table
37
38 def __init__(self, *args):
39 super().__init__(*args)
40 self.dispatch_table = self._copyreg_dispatch_table.copy()
41 self.dispatch_table.update(self._extra_reducers)
42
43 @classmethod
44 def register(cls, type, reduce):
45 '''Register a reduce function for a type.'''
46 cls._extra_reducers[type] = reduce
47
48 @classmethod
49 def dumps(cls, obj, protocol=None):
50 buf = io.BytesIO()
51 cls(buf, protocol).dump(obj)
52 return buf.getbuffer()
53
54 loads = pickle.loads
55
56register = ForkingPickler.register
57
58def dump(obj, file, protocol=None):
59 '''Replacement for pickle.dump() using ForkingPickler.'''
60 ForkingPickler(file, protocol).dump(obj)
Benjamin Petersone711caf2008-06-11 16:44:04 +000061
62#
63# Platform specific definitions
64#
65
66if sys.platform == 'win32':
Antoine Pitrou5438ed12012-04-24 22:56:57 +020067 # Windows
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010068 __all__ += ['DupHandle', 'duplicate', 'steal_handle']
Antoine Pitrou23bba4c2012-04-18 20:51:15 +020069 import _winapi
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010071 def duplicate(handle, target_process=None, inheritable=False):
72 '''Duplicate a handle. (target_process is a handle not a pid!)'''
73 if target_process is None:
74 target_process = _winapi.GetCurrentProcess()
75 return _winapi.DuplicateHandle(
76 _winapi.GetCurrentProcess(), handle, target_process,
77 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
78
79 def steal_handle(source_pid, handle):
80 '''Steal a handle from process identified by source_pid.'''
81 source_process_handle = _winapi.OpenProcess(
82 _winapi.PROCESS_DUP_HANDLE, False, source_pid)
83 try:
84 return _winapi.DuplicateHandle(
85 source_process_handle, handle,
86 _winapi.GetCurrentProcess(), 0, False,
87 _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
88 finally:
89 _winapi.CloseHandle(source_process_handle)
90
Benjamin Petersone711caf2008-06-11 16:44:04 +000091 def send_handle(conn, handle, destination_pid):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010092 '''Send a handle over a local connection.'''
Antoine Pitrou5438ed12012-04-24 22:56:57 +020093 dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
94 conn.send(dh)
Benjamin Petersone711caf2008-06-11 16:44:04 +000095
96 def recv_handle(conn):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010097 '''Receive a handle over a local connection.'''
Antoine Pitrou5438ed12012-04-24 22:56:57 +020098 return conn.recv().detach()
99
100 class DupHandle(object):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100101 '''Picklable wrapper for a handle.'''
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200102 def __init__(self, handle, access, pid=None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200103 if pid is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100104 # We just duplicate the handle in the current process and
105 # let the receiving process steal the handle.
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200106 pid = os.getpid()
107 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
108 try:
109 self._handle = _winapi.DuplicateHandle(
110 _winapi.GetCurrentProcess(),
111 handle, proc, access, False, 0)
112 finally:
113 _winapi.CloseHandle(proc)
114 self._access = access
115 self._pid = pid
116
117 def detach(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100118 '''Get the handle. This should only be called once.'''
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200119 # retrieve handle from process which currently owns it
120 if self._pid == os.getpid():
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100121 # The handle has already been duplicated for this process.
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200122 return self._handle
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100123 # We must steal the handle from the process whose pid is self._pid.
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200124 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
125 self._pid)
126 try:
127 return _winapi.DuplicateHandle(
128 proc, self._handle, _winapi.GetCurrentProcess(),
129 self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
130 finally:
131 _winapi.CloseHandle(proc)
132
Benjamin Petersone711caf2008-06-11 16:44:04 +0000133else:
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200134 # Unix
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100135 __all__ += ['DupFd', 'sendfds', 'recvfds']
136 import array
Richard Oudkerk04ec8ce2012-08-16 16:48:55 +0100137
138 # On MacOSX we should acknowledge receipt of fds -- see Issue14669
139 ACKNOWLEDGE = sys.platform == 'darwin'
140
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100141 def sendfds(sock, fds):
142 '''Send an array of fds over an AF_UNIX socket.'''
143 fds = array.array('i', fds)
144 msg = bytes([len(fds) % 256])
145 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
146 if ACKNOWLEDGE and sock.recv(1) != b'A':
Richard Oudkerk04ec8ce2012-08-16 16:48:55 +0100147 raise RuntimeError('did not receive acknowledgement of fd')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000148
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100149 def recvfds(sock, size):
150 '''Receive an array of fds over an AF_UNIX socket.'''
151 a = array.array('i')
152 bytes_size = a.itemsize * size
153 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
154 if not msg and not ancdata:
155 raise EOFError
156 try:
157 if ACKNOWLEDGE:
158 sock.send(b'A')
159 if len(ancdata) != 1:
160 raise RuntimeError('received %d items of ancdata' %
161 len(ancdata))
162 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
163 if (cmsg_level == socket.SOL_SOCKET and
164 cmsg_type == socket.SCM_RIGHTS):
165 if len(cmsg_data) % a.itemsize != 0:
166 raise ValueError
167 a.frombytes(cmsg_data)
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500168 if len(a) % 256 != msg[0]:
169 raise AssertionError(
170 "Len is {0:n} but msg[0] is {1!r}".format(
171 len(a), msg[0]))
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100172 return list(a)
173 except (ValueError, IndexError):
174 pass
175 raise RuntimeError('Invalid data received')
176
177 def send_handle(conn, handle, destination_pid):
178 '''Send a handle over a local connection.'''
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200179 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100180 sendfds(s, [handle])
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200181
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100182 def recv_handle(conn):
183 '''Receive a handle over a local connection.'''
184 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
185 return recvfds(s, 1)[0]
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200186
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100187 def DupFd(fd):
188 '''Return a wrapper for an fd.'''
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100189 popen_obj = context.get_spawning_popen()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100190 if popen_obj is not None:
191 return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
192 elif HAVE_SEND_HANDLE:
193 from . import resource_sharer
194 return resource_sharer.DupFd(fd)
195 else:
196 raise ValueError('SCM_RIGHTS appears not to be available')
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200197
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100198#
199# Try making some callable types picklable
200#
201
202def _reduce_method(m):
203 if m.__self__ is None:
204 return getattr, (m.__class__, m.__func__.__name__)
205 else:
206 return getattr, (m.__self__, m.__func__.__name__)
207class _C:
208 def f(self):
209 pass
210register(type(_C().f), _reduce_method)
211
212
213def _reduce_method_descriptor(m):
214 return getattr, (m.__objclass__, m.__name__)
215register(type(list.append), _reduce_method_descriptor)
216register(type(int.__add__), _reduce_method_descriptor)
217
218
219def _reduce_partial(p):
220 return _rebuild_partial, (p.func, p.args, p.keywords or {})
221def _rebuild_partial(func, args, keywords):
222 return functools.partial(func, *args, **keywords)
223register(functools.partial, _reduce_partial)
224
225#
226# Make sockets picklable
227#
228
229if sys.platform == 'win32':
230 def _reduce_socket(s):
231 from .resource_sharer import DupSocket
232 return _rebuild_socket, (DupSocket(s),)
233 def _rebuild_socket(ds):
234 return ds.detach()
235 register(socket.socket, _reduce_socket)
236
237else:
238 def _reduce_socket(s):
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200239 df = DupFd(s.fileno())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100240 return _rebuild_socket, (df, s.family, s.type, s.proto)
241 def _rebuild_socket(df, family, type, proto):
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200242 fd = df.detach()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100243 return socket.socket(family, type, proto, fileno=fd)
244 register(socket.socket, _reduce_socket)
Davin Potts54586472016-09-09 18:03:10 -0500245
246
247class AbstractReducer(metaclass=ABCMeta):
248 '''Abstract base class for use in implementing a Reduction class
249 suitable for use in replacing the standard reduction mechanism
250 used in multiprocessing.'''
251 ForkingPickler = ForkingPickler
252 register = register
253 dump = dump
254 send_handle = send_handle
255 recv_handle = recv_handle
256
257 if sys.platform == 'win32':
258 steal_handle = steal_handle
259 duplicate = duplicate
260 DupHandle = DupHandle
261 else:
262 sendfds = sendfds
263 recvfds = recvfds
264 DupFd = DupFd
265
266 _reduce_method = _reduce_method
267 _reduce_method_descriptor = _reduce_method_descriptor
268 _rebuild_partial = _rebuild_partial
269 _reduce_socket = _reduce_socket
270 _rebuild_socket = _rebuild_socket
271
272 def __init__(self, *args):
273 register(type(_C().f), _reduce_method)
274 register(type(list.append), _reduce_method_descriptor)
275 register(type(int.__add__), _reduce_method_descriptor)
276 register(functools.partial, _reduce_partial)
277 register(socket.socket, _reduce_socket)