blob: 8f209b47dab7985aa9b8433b0feab047dfd54341 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002# Module which deals with pickling of objects.
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# multiprocessing/reduction.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010010import copyreg
11import functools
12import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000013import os
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010014import pickle
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import socket
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010016import sys
Benjamin Petersone711caf2008-06-11 16:44:04 +000017
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010018from . import context
Benjamin Petersone711caf2008-06-11 16:44:04 +000019
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010020__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
21
22
23HAVE_SEND_HANDLE = (sys.platform == 'win32' or
24 (hasattr(socket, 'CMSG_LEN') and
25 hasattr(socket, 'SCM_RIGHTS') and
26 hasattr(socket.socket, 'sendmsg')))
Benjamin Petersone711caf2008-06-11 16:44:04 +000027
28#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010029# Pickler subclass
Benjamin Petersone711caf2008-06-11 16:44:04 +000030#
31
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010032class ForkingPickler(pickle.Pickler):
33 '''Pickler subclass used by multiprocessing.'''
34 _extra_reducers = {}
35 _copyreg_dispatch_table = copyreg.dispatch_table
36
37 def __init__(self, *args):
38 super().__init__(*args)
39 self.dispatch_table = self._copyreg_dispatch_table.copy()
40 self.dispatch_table.update(self._extra_reducers)
41
42 @classmethod
43 def register(cls, type, reduce):
44 '''Register a reduce function for a type.'''
45 cls._extra_reducers[type] = reduce
46
47 @classmethod
48 def dumps(cls, obj, protocol=None):
49 buf = io.BytesIO()
50 cls(buf, protocol).dump(obj)
51 return buf.getbuffer()
52
53 loads = pickle.loads
54
55register = ForkingPickler.register
56
57def dump(obj, file, protocol=None):
58 '''Replacement for pickle.dump() using ForkingPickler.'''
59 ForkingPickler(file, protocol).dump(obj)
Benjamin Petersone711caf2008-06-11 16:44:04 +000060
61#
62# Platform specific definitions
63#
64
65if sys.platform == 'win32':
Antoine Pitrou5438ed12012-04-24 22:56:57 +020066 # Windows
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010067 __all__ += ['DupHandle', 'duplicate', 'steal_handle']
Antoine Pitrou23bba4c2012-04-18 20:51:15 +020068 import _winapi
Benjamin Petersone711caf2008-06-11 16:44:04 +000069
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010070 def duplicate(handle, target_process=None, inheritable=False):
71 '''Duplicate a handle. (target_process is a handle not a pid!)'''
72 if target_process is None:
73 target_process = _winapi.GetCurrentProcess()
74 return _winapi.DuplicateHandle(
75 _winapi.GetCurrentProcess(), handle, target_process,
76 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
77
78 def steal_handle(source_pid, handle):
79 '''Steal a handle from process identified by source_pid.'''
80 source_process_handle = _winapi.OpenProcess(
81 _winapi.PROCESS_DUP_HANDLE, False, source_pid)
82 try:
83 return _winapi.DuplicateHandle(
84 source_process_handle, handle,
85 _winapi.GetCurrentProcess(), 0, False,
86 _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
87 finally:
88 _winapi.CloseHandle(source_process_handle)
89
Benjamin Petersone711caf2008-06-11 16:44:04 +000090 def send_handle(conn, handle, destination_pid):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010091 '''Send a handle over a local connection.'''
Antoine Pitrou5438ed12012-04-24 22:56:57 +020092 dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
93 conn.send(dh)
Benjamin Petersone711caf2008-06-11 16:44:04 +000094
95 def recv_handle(conn):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010096 '''Receive a handle over a local connection.'''
Antoine Pitrou5438ed12012-04-24 22:56:57 +020097 return conn.recv().detach()
98
99 class DupHandle(object):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100100 '''Picklable wrapper for a handle.'''
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200101 def __init__(self, handle, access, pid=None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200102 if pid is None:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100103 # We just duplicate the handle in the current process and
104 # let the receiving process steal the handle.
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200105 pid = os.getpid()
106 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
107 try:
108 self._handle = _winapi.DuplicateHandle(
109 _winapi.GetCurrentProcess(),
110 handle, proc, access, False, 0)
111 finally:
112 _winapi.CloseHandle(proc)
113 self._access = access
114 self._pid = pid
115
116 def detach(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100117 '''Get the handle. This should only be called once.'''
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200118 # retrieve handle from process which currently owns it
119 if self._pid == os.getpid():
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100120 # The handle has already been duplicated for this process.
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200121 return self._handle
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100122 # We must steal the handle from the process whose pid is self._pid.
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200123 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
124 self._pid)
125 try:
126 return _winapi.DuplicateHandle(
127 proc, self._handle, _winapi.GetCurrentProcess(),
128 self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
129 finally:
130 _winapi.CloseHandle(proc)
131
Benjamin Petersone711caf2008-06-11 16:44:04 +0000132else:
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200133 # Unix
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100134 __all__ += ['DupFd', 'sendfds', 'recvfds']
135 import array
Richard Oudkerk04ec8ce2012-08-16 16:48:55 +0100136
137 # On MacOSX we should acknowledge receipt of fds -- see Issue14669
138 ACKNOWLEDGE = sys.platform == 'darwin'
139
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100140 def sendfds(sock, fds):
141 '''Send an array of fds over an AF_UNIX socket.'''
142 fds = array.array('i', fds)
143 msg = bytes([len(fds) % 256])
144 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
145 if ACKNOWLEDGE and sock.recv(1) != b'A':
Richard Oudkerk04ec8ce2012-08-16 16:48:55 +0100146 raise RuntimeError('did not receive acknowledgement of fd')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100148 def recvfds(sock, size):
149 '''Receive an array of fds over an AF_UNIX socket.'''
150 a = array.array('i')
151 bytes_size = a.itemsize * size
152 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
153 if not msg and not ancdata:
154 raise EOFError
155 try:
156 if ACKNOWLEDGE:
157 sock.send(b'A')
158 if len(ancdata) != 1:
159 raise RuntimeError('received %d items of ancdata' %
160 len(ancdata))
161 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
162 if (cmsg_level == socket.SOL_SOCKET and
163 cmsg_type == socket.SCM_RIGHTS):
164 if len(cmsg_data) % a.itemsize != 0:
165 raise ValueError
166 a.frombytes(cmsg_data)
167 assert len(a) % 256 == msg[0]
168 return list(a)
169 except (ValueError, IndexError):
170 pass
171 raise RuntimeError('Invalid data received')
172
173 def send_handle(conn, handle, destination_pid):
174 '''Send a handle over a local connection.'''
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200175 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100176 sendfds(s, [handle])
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200177
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100178 def recv_handle(conn):
179 '''Receive a handle over a local connection.'''
180 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
181 return recvfds(s, 1)[0]
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200182
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100183 def DupFd(fd):
184 '''Return a wrapper for an fd.'''
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100185 popen_obj = context.get_spawning_popen()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100186 if popen_obj is not None:
187 return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
188 elif HAVE_SEND_HANDLE:
189 from . import resource_sharer
190 return resource_sharer.DupFd(fd)
191 else:
192 raise ValueError('SCM_RIGHTS appears not to be available')
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200193
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100194#
195# Try making some callable types picklable
196#
197
198def _reduce_method(m):
199 if m.__self__ is None:
200 return getattr, (m.__class__, m.__func__.__name__)
201 else:
202 return getattr, (m.__self__, m.__func__.__name__)
203class _C:
204 def f(self):
205 pass
206register(type(_C().f), _reduce_method)
207
208
209def _reduce_method_descriptor(m):
210 return getattr, (m.__objclass__, m.__name__)
211register(type(list.append), _reduce_method_descriptor)
212register(type(int.__add__), _reduce_method_descriptor)
213
214
215def _reduce_partial(p):
216 return _rebuild_partial, (p.func, p.args, p.keywords or {})
217def _rebuild_partial(func, args, keywords):
218 return functools.partial(func, *args, **keywords)
219register(functools.partial, _reduce_partial)
220
221#
222# Make sockets picklable
223#
224
225if sys.platform == 'win32':
226 def _reduce_socket(s):
227 from .resource_sharer import DupSocket
228 return _rebuild_socket, (DupSocket(s),)
229 def _rebuild_socket(ds):
230 return ds.detach()
231 register(socket.socket, _reduce_socket)
232
233else:
234 def _reduce_socket(s):
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200235 df = DupFd(s.fileno())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100236 return _rebuild_socket, (df, s.family, s.type, s.proto)
237 def _rebuild_socket(df, family, type, proto):
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200238 fd = df.detach()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100239 return socket.socket(family, type, proto, fileno=fd)
240 register(socket.socket, _reduce_socket)