blob: 84d2fe9ab21c25dd0c8ce8398d95e7f50952304a [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module to allow connection and socket objects to be transferred
3# between processes
4#
5# multiprocessing/reduction.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01008# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00009#
10
Antoine Pitrou5438ed12012-04-24 22:56:57 +020011__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']
Benjamin Petersone711caf2008-06-11 16:44:04 +000012
13import os
14import sys
15import socket
16import threading
Charles-François Natalidc863dd2011-09-24 20:04:29 +020017import struct
Antoine Pitrou92ff4e12012-04-27 23:51:03 +020018import signal
Benjamin Petersone711caf2008-06-11 16:44:04 +000019
Benjamin Petersone711caf2008-06-11 16:44:04 +000020from multiprocessing import current_process
Benjamin Petersone711caf2008-06-11 16:44:04 +000021from multiprocessing.util import register_after_fork, debug, sub_debug
Antoine Pitrou5438ed12012-04-24 22:56:57 +020022from multiprocessing.util import is_exiting, sub_warning
Benjamin Petersone711caf2008-06-11 16:44:04 +000023
24
25#
26#
27#
28
Charles-François Natalidc863dd2011-09-24 20:04:29 +020029if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
30 hasattr(socket, 'SCM_RIGHTS'))):
Benjamin Petersone711caf2008-06-11 16:44:04 +000031 raise ImportError('pickling of connections not supported')
32
33#
34# Platform specific definitions
35#
36
37if sys.platform == 'win32':
Antoine Pitrou5438ed12012-04-24 22:56:57 +020038 # Windows
39 __all__ += ['reduce_pipe_connection']
Antoine Pitrou23bba4c2012-04-18 20:51:15 +020040 import _winapi
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
42 def send_handle(conn, handle, destination_pid):
Antoine Pitrou5438ed12012-04-24 22:56:57 +020043 dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
44 conn.send(dh)
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
46 def recv_handle(conn):
Antoine Pitrou5438ed12012-04-24 22:56:57 +020047 return conn.recv().detach()
48
49 class DupHandle(object):
50 def __init__(self, handle, access, pid=None):
51 # duplicate handle for process with given pid
52 if pid is None:
53 pid = os.getpid()
54 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
55 try:
56 self._handle = _winapi.DuplicateHandle(
57 _winapi.GetCurrentProcess(),
58 handle, proc, access, False, 0)
59 finally:
60 _winapi.CloseHandle(proc)
61 self._access = access
62 self._pid = pid
63
64 def detach(self):
65 # retrieve handle from process which currently owns it
66 if self._pid == os.getpid():
67 return self._handle
68 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
69 self._pid)
70 try:
71 return _winapi.DuplicateHandle(
72 proc, self._handle, _winapi.GetCurrentProcess(),
73 self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
74 finally:
75 _winapi.CloseHandle(proc)
76
77 class DupSocket(object):
78 def __init__(self, sock):
79 new_sock = sock.dup()
80 def send(conn, pid):
81 share = new_sock.share(pid)
82 conn.send_bytes(share)
83 self._id = resource_sharer.register(send, new_sock.close)
84
85 def detach(self):
86 conn = resource_sharer.get_connection(self._id)
87 try:
88 share = conn.recv_bytes()
89 return socket.fromshare(share)
90 finally:
91 conn.close()
92
93 def reduce_socket(s):
94 return rebuild_socket, (DupSocket(s),)
95
96 def rebuild_socket(ds):
97 return ds.detach()
98
99 def reduce_connection(conn):
100 handle = conn.fileno()
101 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
102 ds = DupSocket(s)
103 return rebuild_connection, (ds, conn.readable, conn.writable)
104
105 def rebuild_connection(ds, readable, writable):
106 from .connection import Connection
107 sock = ds.detach()
108 return Connection(sock.detach(), readable, writable)
109
110 def reduce_pipe_connection(conn):
111 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
112 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
113 dh = DupHandle(conn.fileno(), access)
114 return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
115
116 def rebuild_pipe_connection(dh, readable, writable):
117 from .connection import PipeConnection
118 handle = dh.detach()
119 return PipeConnection(handle, readable, writable)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000120
121else:
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200122 # Unix
Benjamin Petersone711caf2008-06-11 16:44:04 +0000123 def send_handle(conn, handle, destination_pid):
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200124 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
125 s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
126 struct.pack("@i", handle))])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000127
128 def recv_handle(conn):
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200129 size = struct.calcsize("@i")
130 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
131 msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
132 try:
133 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
134 if (cmsg_level == socket.SOL_SOCKET and
135 cmsg_type == socket.SCM_RIGHTS):
136 return struct.unpack("@i", cmsg_data[:size])[0]
137 except (ValueError, IndexError, struct.error):
138 pass
139 raise RuntimeError('Invalid data received')
140
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200141 class DupFd(object):
142 def __init__(self, fd):
143 new_fd = os.dup(fd)
144 def send(conn, pid):
145 send_handle(conn, new_fd, pid)
146 def close():
147 os.close(new_fd)
148 self._id = resource_sharer.register(send, close)
149
150 def detach(self):
151 conn = resource_sharer.get_connection(self._id)
152 try:
153 return recv_handle(conn)
154 finally:
155 conn.close()
156
157 def reduce_socket(s):
158 df = DupFd(s.fileno())
159 return rebuild_socket, (df, s.family, s.type, s.proto)
160
161 def rebuild_socket(df, family, type, proto):
162 fd = df.detach()
163 s = socket.fromfd(fd, family, type, proto)
164 os.close(fd)
165 return s
166
167 def reduce_connection(conn):
168 df = DupFd(conn.fileno())
169 return rebuild_connection, (df, conn.readable, conn.writable)
170
171 def rebuild_connection(df, readable, writable):
172 from .connection import Connection
173 fd = df.detach()
174 return Connection(fd, readable, writable)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175
176#
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200177# Server which shares registered resources with clients
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178#
179
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200180class ResourceSharer(object):
181 def __init__(self):
182 self._key = 0
183 self._cache = {}
184 self._old_locks = []
185 self._lock = threading.Lock()
186 self._listener = None
187 self._address = None
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200188 self._thread = None
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200189 register_after_fork(self, ResourceSharer._afterfork)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000190
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200191 def register(self, send, close):
192 with self._lock:
193 if self._address is None:
194 self._start()
195 self._key += 1
196 self._cache[self._key] = (send, close)
197 return (self._address, self._key)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200199 @staticmethod
200 def get_connection(ident):
201 from .connection import Client
202 address, key = ident
203 c = Client(address, authkey=current_process().authkey)
204 c.send((key, os.getpid()))
205 return c
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200207 def stop(self, timeout=None):
208 from .connection import Client
209 with self._lock:
210 if self._address is not None:
211 c = Client(self._address, authkey=current_process().authkey)
212 c.send(None)
213 c.close()
214 self._thread.join(timeout)
215 if self._thread.is_alive():
216 sub_warn('ResourceSharer thread did not stop when asked')
217 self._listener.close()
218 self._thread = None
219 self._address = None
220 self._listener = None
221 for key, (send, close) in self._cache.items():
222 close()
223 self._cache.clear()
224
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200225 def _afterfork(self):
226 for key, (send, close) in self._cache.items():
227 close()
228 self._cache.clear()
229 # If self._lock was locked at the time of the fork, it may be broken
230 # -- see issue 6721. Replace it without letting it be gc'ed.
231 self._old_locks.append(self._lock)
232 self._lock = threading.Lock()
233 if self._listener is not None:
234 self._listener.close()
235 self._listener = None
236 self._address = None
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200237 self._thread = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200239 def _start(self):
240 from .connection import Listener
241 assert self._listener is None
242 debug('starting listener and thread for sending handles')
243 self._listener = Listener(authkey=current_process().authkey)
244 self._address = self._listener.address
245 t = threading.Thread(target=self._serve)
246 t.daemon = True
247 t.start()
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200248 self._thread = t
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200250 def _serve(self):
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200251 if hasattr(signal, 'pthread_sigmask'):
252 signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200253 while 1:
254 try:
255 conn = self._listener.accept()
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200256 msg = conn.recv()
257 if msg is None:
258 break
259 key, destination_pid = msg
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200260 send, close = self._cache.pop(key)
261 send(conn, destination_pid)
262 close()
263 conn.close()
264 except:
265 if not is_exiting():
266 import traceback
267 sub_warning(
268 'thread for sharing handles raised exception :\n' +
269 '-'*79 + '\n' + traceback.format_exc() + '-'*79
270 )
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200272resource_sharer = ResourceSharer()