blob: cef445b4d8468aa07c2d5e75bb70690ec62b4e4b [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module to allow connection and socket objects to be transferred
3# between processes
4#
5# multiprocessing/reduction.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15# notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17# notice, this list of conditions and the following disclaimer in the
18# documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20# used to endorse or promote products derived from this software
21# without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000034#
35
Antoine Pitrou5438ed12012-04-24 22:56:57 +020036__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38import os
39import sys
40import socket
41import threading
Charles-François Natalidc863dd2011-09-24 20:04:29 +020042import struct
Antoine Pitrou92ff4e12012-04-27 23:51:03 +020043import signal
Benjamin Petersone711caf2008-06-11 16:44:04 +000044
Benjamin Petersone711caf2008-06-11 16:44:04 +000045from multiprocessing import current_process
Benjamin Petersone711caf2008-06-11 16:44:04 +000046from multiprocessing.util import register_after_fork, debug, sub_debug
Antoine Pitrou5438ed12012-04-24 22:56:57 +020047from multiprocessing.util import is_exiting, sub_warning
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
49
50#
51#
52#
53
Charles-François Natalidc863dd2011-09-24 20:04:29 +020054if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
55 hasattr(socket, 'SCM_RIGHTS'))):
Benjamin Petersone711caf2008-06-11 16:44:04 +000056 raise ImportError('pickling of connections not supported')
57
58#
59# Platform specific definitions
60#
61
62if sys.platform == 'win32':
Antoine Pitrou5438ed12012-04-24 22:56:57 +020063 # Windows
64 __all__ += ['reduce_pipe_connection']
Antoine Pitrou23bba4c2012-04-18 20:51:15 +020065 import _winapi
Benjamin Petersone711caf2008-06-11 16:44:04 +000066
67 def send_handle(conn, handle, destination_pid):
Antoine Pitrou5438ed12012-04-24 22:56:57 +020068 dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
69 conn.send(dh)
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71 def recv_handle(conn):
Antoine Pitrou5438ed12012-04-24 22:56:57 +020072 return conn.recv().detach()
73
74 class DupHandle(object):
75 def __init__(self, handle, access, pid=None):
76 # duplicate handle for process with given pid
77 if pid is None:
78 pid = os.getpid()
79 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
80 try:
81 self._handle = _winapi.DuplicateHandle(
82 _winapi.GetCurrentProcess(),
83 handle, proc, access, False, 0)
84 finally:
85 _winapi.CloseHandle(proc)
86 self._access = access
87 self._pid = pid
88
89 def detach(self):
90 # retrieve handle from process which currently owns it
91 if self._pid == os.getpid():
92 return self._handle
93 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
94 self._pid)
95 try:
96 return _winapi.DuplicateHandle(
97 proc, self._handle, _winapi.GetCurrentProcess(),
98 self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
99 finally:
100 _winapi.CloseHandle(proc)
101
102 class DupSocket(object):
103 def __init__(self, sock):
104 new_sock = sock.dup()
105 def send(conn, pid):
106 share = new_sock.share(pid)
107 conn.send_bytes(share)
108 self._id = resource_sharer.register(send, new_sock.close)
109
110 def detach(self):
111 conn = resource_sharer.get_connection(self._id)
112 try:
113 share = conn.recv_bytes()
114 return socket.fromshare(share)
115 finally:
116 conn.close()
117
118 def reduce_socket(s):
119 return rebuild_socket, (DupSocket(s),)
120
121 def rebuild_socket(ds):
122 return ds.detach()
123
124 def reduce_connection(conn):
125 handle = conn.fileno()
126 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
127 ds = DupSocket(s)
128 return rebuild_connection, (ds, conn.readable, conn.writable)
129
130 def rebuild_connection(ds, readable, writable):
131 from .connection import Connection
132 sock = ds.detach()
133 return Connection(sock.detach(), readable, writable)
134
135 def reduce_pipe_connection(conn):
136 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
137 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
138 dh = DupHandle(conn.fileno(), access)
139 return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
140
141 def rebuild_pipe_connection(dh, readable, writable):
142 from .connection import PipeConnection
143 handle = dh.detach()
144 return PipeConnection(handle, readable, writable)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000145
146else:
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200147 # Unix
Benjamin Petersone711caf2008-06-11 16:44:04 +0000148 def send_handle(conn, handle, destination_pid):
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200149 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
150 s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
151 struct.pack("@i", handle))])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152
153 def recv_handle(conn):
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200154 size = struct.calcsize("@i")
155 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
156 msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
157 try:
158 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
159 if (cmsg_level == socket.SOL_SOCKET and
160 cmsg_type == socket.SCM_RIGHTS):
161 return struct.unpack("@i", cmsg_data[:size])[0]
162 except (ValueError, IndexError, struct.error):
163 pass
164 raise RuntimeError('Invalid data received')
165
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200166 class DupFd(object):
167 def __init__(self, fd):
168 new_fd = os.dup(fd)
169 def send(conn, pid):
170 send_handle(conn, new_fd, pid)
171 def close():
172 os.close(new_fd)
173 self._id = resource_sharer.register(send, close)
174
175 def detach(self):
176 conn = resource_sharer.get_connection(self._id)
177 try:
178 return recv_handle(conn)
179 finally:
180 conn.close()
181
182 def reduce_socket(s):
183 df = DupFd(s.fileno())
184 return rebuild_socket, (df, s.family, s.type, s.proto)
185
186 def rebuild_socket(df, family, type, proto):
187 fd = df.detach()
188 s = socket.fromfd(fd, family, type, proto)
189 os.close(fd)
190 return s
191
192 def reduce_connection(conn):
193 df = DupFd(conn.fileno())
194 return rebuild_connection, (df, conn.readable, conn.writable)
195
196 def rebuild_connection(df, readable, writable):
197 from .connection import Connection
198 fd = df.detach()
199 return Connection(fd, readable, writable)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000200
201#
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200202# Server which shares registered resources with clients
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203#
204
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200205class ResourceSharer(object):
206 def __init__(self):
207 self._key = 0
208 self._cache = {}
209 self._old_locks = []
210 self._lock = threading.Lock()
211 self._listener = None
212 self._address = None
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200213 self._thread = None
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200214 register_after_fork(self, ResourceSharer._afterfork)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200216 def register(self, send, close):
217 with self._lock:
218 if self._address is None:
219 self._start()
220 self._key += 1
221 self._cache[self._key] = (send, close)
222 return (self._address, self._key)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000223
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200224 @staticmethod
225 def get_connection(ident):
226 from .connection import Client
227 address, key = ident
228 c = Client(address, authkey=current_process().authkey)
229 c.send((key, os.getpid()))
230 return c
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200232 def stop(self, timeout=None):
233 from .connection import Client
234 with self._lock:
235 if self._address is not None:
236 c = Client(self._address, authkey=current_process().authkey)
237 c.send(None)
238 c.close()
239 self._thread.join(timeout)
240 if self._thread.is_alive():
241 sub_warn('ResourceSharer thread did not stop when asked')
242 self._listener.close()
243 self._thread = None
244 self._address = None
245 self._listener = None
246 for key, (send, close) in self._cache.items():
247 close()
248 self._cache.clear()
249
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200250 def _afterfork(self):
251 for key, (send, close) in self._cache.items():
252 close()
253 self._cache.clear()
254 # If self._lock was locked at the time of the fork, it may be broken
255 # -- see issue 6721. Replace it without letting it be gc'ed.
256 self._old_locks.append(self._lock)
257 self._lock = threading.Lock()
258 if self._listener is not None:
259 self._listener.close()
260 self._listener = None
261 self._address = None
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200262 self._thread = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200264 def _start(self):
265 from .connection import Listener
266 assert self._listener is None
267 debug('starting listener and thread for sending handles')
268 self._listener = Listener(authkey=current_process().authkey)
269 self._address = self._listener.address
270 t = threading.Thread(target=self._serve)
271 t.daemon = True
272 t.start()
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200273 self._thread = t
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200275 def _serve(self):
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200276 if hasattr(signal, 'pthread_sigmask'):
277 signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200278 while 1:
279 try:
280 conn = self._listener.accept()
Antoine Pitrou92ff4e12012-04-27 23:51:03 +0200281 msg = conn.recv()
282 if msg is None:
283 break
284 key, destination_pid = msg
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200285 send, close = self._cache.pop(key)
286 send(conn, destination_pid)
287 close()
288 conn.close()
289 except:
290 if not is_exiting():
291 import traceback
292 sub_warning(
293 'thread for sharing handles raised exception :\n' +
294 '-'*79 + '\n' + traceback.format_exc() + '-'*79
295 )
Benjamin Petersone711caf2008-06-11 16:44:04 +0000296
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200297resource_sharer = ResourceSharer()