blob: 656fa8ff6b074f8359df3095a3a3fba3b9b3e450 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module to allow connection and socket objects to be transferred
3# between processes
4#
5# multiprocessing/reduction.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
Antoine Pitrou5438ed12012-04-24 22:56:57 +020011__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']
Benjamin Petersone711caf2008-06-11 16:44:04 +000012
13import os
14import sys
15import socket
16import threading
Charles-François Natalidc863dd2011-09-24 20:04:29 +020017import struct
Antoine Pitrou92ff4e12012-04-27 23:51:03 +020018import signal
Benjamin Petersone711caf2008-06-11 16:44:04 +000019
Benjamin Petersone711caf2008-06-11 16:44:04 +000020from multiprocessing import current_process
Benjamin Petersone711caf2008-06-11 16:44:04 +000021from multiprocessing.util import register_after_fork, debug, sub_debug
Antoine Pitrou5438ed12012-04-24 22:56:57 +020022from multiprocessing.util import is_exiting, sub_warning
Benjamin Petersone711caf2008-06-11 16:44:04 +000023
24
25#
26#
27#
28
Charles-François Natalidc863dd2011-09-24 20:04:29 +020029if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
30 hasattr(socket, 'SCM_RIGHTS'))):
Benjamin Petersone711caf2008-06-11 16:44:04 +000031 raise ImportError('pickling of connections not supported')
32
33#
34# Platform specific definitions
35#
36
37if sys.platform == 'win32':
Antoine Pitrou5438ed12012-04-24 22:56:57 +020038 # Windows
39 __all__ += ['reduce_pipe_connection']
Antoine Pitrou23bba4c2012-04-18 20:51:15 +020040 import _winapi
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
42 def send_handle(conn, handle, destination_pid):
Antoine Pitrou5438ed12012-04-24 22:56:57 +020043 dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
44 conn.send(dh)
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
46 def recv_handle(conn):
Antoine Pitrou5438ed12012-04-24 22:56:57 +020047 return conn.recv().detach()
48
49 class DupHandle(object):
50 def __init__(self, handle, access, pid=None):
51 # duplicate handle for process with given pid
52 if pid is None:
53 pid = os.getpid()
54 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
55 try:
56 self._handle = _winapi.DuplicateHandle(
57 _winapi.GetCurrentProcess(),
58 handle, proc, access, False, 0)
59 finally:
60 _winapi.CloseHandle(proc)
61 self._access = access
62 self._pid = pid
63
64 def detach(self):
65 # retrieve handle from process which currently owns it
66 if self._pid == os.getpid():
67 return self._handle
68 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
69 self._pid)
70 try:
71 return _winapi.DuplicateHandle(
72 proc, self._handle, _winapi.GetCurrentProcess(),
73 self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
74 finally:
75 _winapi.CloseHandle(proc)
76
77 class DupSocket(object):
78 def __init__(self, sock):
79 new_sock = sock.dup()
80 def send(conn, pid):
81 share = new_sock.share(pid)
82 conn.send_bytes(share)
83 self._id = resource_sharer.register(send, new_sock.close)
84
85 def detach(self):
86 conn = resource_sharer.get_connection(self._id)
87 try:
88 share = conn.recv_bytes()
89 return socket.fromshare(share)
90 finally:
91 conn.close()
92
93 def reduce_socket(s):
94 return rebuild_socket, (DupSocket(s),)
95
96 def rebuild_socket(ds):
97 return ds.detach()
98
99 def reduce_connection(conn):
100 handle = conn.fileno()
101 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
102 ds = DupSocket(s)
103 return rebuild_connection, (ds, conn.readable, conn.writable)
104
105 def rebuild_connection(ds, readable, writable):
106 from .connection import Connection
107 sock = ds.detach()
108 return Connection(sock.detach(), readable, writable)
109
110 def reduce_pipe_connection(conn):
111 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
112 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
113 dh = DupHandle(conn.fileno(), access)
114 return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
115
116 def rebuild_pipe_connection(dh, readable, writable):
117 from .connection import PipeConnection
118 handle = dh.detach()
119 return PipeConnection(handle, readable, writable)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000120
121else:
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200122 # Unix
Richard Oudkerk04ec8ce2012-08-16 16:48:55 +0100123
124 # On MacOSX we should acknowledge receipt of fds -- see Issue14669
125 ACKNOWLEDGE = sys.platform == 'darwin'
126
Benjamin Petersone711caf2008-06-11 16:44:04 +0000127 def send_handle(conn, handle, destination_pid):
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200128 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
129 s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
130 struct.pack("@i", handle))])
Richard Oudkerk04ec8ce2012-08-16 16:48:55 +0100131 if ACKNOWLEDGE and conn.recv_bytes() != b'ACK':
132 raise RuntimeError('did not receive acknowledgement of fd')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000133
134 def recv_handle(conn):
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200135 size = struct.calcsize("@i")
136 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
137 msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
138 try:
Richard Oudkerk04ec8ce2012-08-16 16:48:55 +0100139 if ACKNOWLEDGE:
140 conn.send_bytes(b'ACK')
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200141 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
142 if (cmsg_level == socket.SOL_SOCKET and
143 cmsg_type == socket.SCM_RIGHTS):
144 return struct.unpack("@i", cmsg_data[:size])[0]
145 except (ValueError, IndexError, struct.error):
146 pass
147 raise RuntimeError('Invalid data received')
148
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200149 class DupFd(object):
150 def __init__(self, fd):
151 new_fd = os.dup(fd)
152 def send(conn, pid):
153 send_handle(conn, new_fd, pid)
154 def close():
155 os.close(new_fd)
156 self._id = resource_sharer.register(send, close)
157
158 def detach(self):
159 conn = resource_sharer.get_connection(self._id)
160 try:
161 return recv_handle(conn)
162 finally:
163 conn.close()
164
165 def reduce_socket(s):
166 df = DupFd(s.fileno())
167 return rebuild_socket, (df, s.family, s.type, s.proto)
168
169 def rebuild_socket(df, family, type, proto):
170 fd = df.detach()
171 s = socket.fromfd(fd, family, type, proto)
172 os.close(fd)
173 return s
174
175 def reduce_connection(conn):
176 df = DupFd(conn.fileno())
177 return rebuild_connection, (df, conn.readable, conn.writable)
178
179 def rebuild_connection(df, readable, writable):
180 from .connection import Connection
181 fd = df.detach()
182 return Connection(fd, readable, writable)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183
184#
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200185# Server which shares registered resources with clients
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186#
187
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200188class ResourceSharer(object):
189 def __init__(self):
190 self._key = 0
191 self._cache = {}
192 self._old_locks = []
193 self._lock = threading.Lock()
194 self._listener = None
195 self._address = None
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200196 self._thread = None
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200197 register_after_fork(self, ResourceSharer._afterfork)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200199 def register(self, send, close):
200 with self._lock:
201 if self._address is None:
202 self._start()
203 self._key += 1
204 self._cache[self._key] = (send, close)
205 return (self._address, self._key)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200207 @staticmethod
208 def get_connection(ident):
209 from .connection import Client
210 address, key = ident
211 c = Client(address, authkey=current_process().authkey)
212 c.send((key, os.getpid()))
213 return c
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200215 def stop(self, timeout=None):
216 from .connection import Client
217 with self._lock:
218 if self._address is not None:
219 c = Client(self._address, authkey=current_process().authkey)
220 c.send(None)
221 c.close()
222 self._thread.join(timeout)
223 if self._thread.is_alive():
224 sub_warn('ResourceSharer thread did not stop when asked')
225 self._listener.close()
226 self._thread = None
227 self._address = None
228 self._listener = None
229 for key, (send, close) in self._cache.items():
230 close()
231 self._cache.clear()
232
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200233 def _afterfork(self):
234 for key, (send, close) in self._cache.items():
235 close()
236 self._cache.clear()
237 # If self._lock was locked at the time of the fork, it may be broken
238 # -- see issue 6721. Replace it without letting it be gc'ed.
239 self._old_locks.append(self._lock)
240 self._lock = threading.Lock()
241 if self._listener is not None:
242 self._listener.close()
243 self._listener = None
244 self._address = None
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200245 self._thread = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000246
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200247 def _start(self):
248 from .connection import Listener
249 assert self._listener is None
250 debug('starting listener and thread for sending handles')
251 self._listener = Listener(authkey=current_process().authkey)
252 self._address = self._listener.address
253 t = threading.Thread(target=self._serve)
254 t.daemon = True
255 t.start()
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200256 self._thread = t
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200258 def _serve(self):
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200259 if hasattr(signal, 'pthread_sigmask'):
260 signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200261 while 1:
262 try:
263 conn = self._listener.accept()
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200264 msg = conn.recv()
265 if msg is None:
266 break
267 key, destination_pid = msg
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200268 send, close = self._cache.pop(key)
269 send(conn, destination_pid)
270 close()
271 conn.close()
272 except:
273 if not is_exiting():
274 import traceback
275 sub_warning(
276 'thread for sharing handles raised exception :\n' +
277 '-'*79 + '\n' + traceback.format_exc() + '-'*79
278 )
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200280resource_sharer = ResourceSharer()