blob: ce38fe367e0a83f5328d7ffda1daf88631fca981 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module to allow connection and socket objects to be transferred
3# between processes
4#
5# multiprocessing/reduction.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15# notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17# notice, this list of conditions and the following disclaimer in the
18# documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20# used to endorse or promote products derived from this software
21# without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000034#
35
Antoine Pitrou5438ed12012-04-24 22:56:57 +020036__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38import os
39import sys
40import socket
41import threading
Charles-François Natalidc863dd2011-09-24 20:04:29 +020042import struct
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
Benjamin Petersone711caf2008-06-11 16:44:04 +000044from multiprocessing import current_process
Benjamin Petersone711caf2008-06-11 16:44:04 +000045from multiprocessing.util import register_after_fork, debug, sub_debug
Antoine Pitrou5438ed12012-04-24 22:56:57 +020046from multiprocessing.util import is_exiting, sub_warning
Benjamin Petersone711caf2008-06-11 16:44:04 +000047
48
49#
50#
51#
52
Charles-François Natalidc863dd2011-09-24 20:04:29 +020053if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
54 hasattr(socket, 'SCM_RIGHTS'))):
Benjamin Petersone711caf2008-06-11 16:44:04 +000055 raise ImportError('pickling of connections not supported')
56
57#
58# Platform specific definitions
59#
60
61if sys.platform == 'win32':
Antoine Pitrou5438ed12012-04-24 22:56:57 +020062 # Windows
63 __all__ += ['reduce_pipe_connection']
Antoine Pitrou23bba4c2012-04-18 20:51:15 +020064 import _winapi
Benjamin Petersone711caf2008-06-11 16:44:04 +000065
66 def send_handle(conn, handle, destination_pid):
Antoine Pitrou5438ed12012-04-24 22:56:57 +020067 dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
68 conn.send(dh)
Benjamin Petersone711caf2008-06-11 16:44:04 +000069
70 def recv_handle(conn):
Antoine Pitrou5438ed12012-04-24 22:56:57 +020071 return conn.recv().detach()
72
73 class DupHandle(object):
74 def __init__(self, handle, access, pid=None):
75 # duplicate handle for process with given pid
76 if pid is None:
77 pid = os.getpid()
78 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
79 try:
80 self._handle = _winapi.DuplicateHandle(
81 _winapi.GetCurrentProcess(),
82 handle, proc, access, False, 0)
83 finally:
84 _winapi.CloseHandle(proc)
85 self._access = access
86 self._pid = pid
87
88 def detach(self):
89 # retrieve handle from process which currently owns it
90 if self._pid == os.getpid():
91 return self._handle
92 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
93 self._pid)
94 try:
95 return _winapi.DuplicateHandle(
96 proc, self._handle, _winapi.GetCurrentProcess(),
97 self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
98 finally:
99 _winapi.CloseHandle(proc)
100
101 class DupSocket(object):
102 def __init__(self, sock):
103 new_sock = sock.dup()
104 def send(conn, pid):
105 share = new_sock.share(pid)
106 conn.send_bytes(share)
107 self._id = resource_sharer.register(send, new_sock.close)
108
109 def detach(self):
110 conn = resource_sharer.get_connection(self._id)
111 try:
112 share = conn.recv_bytes()
113 return socket.fromshare(share)
114 finally:
115 conn.close()
116
117 def reduce_socket(s):
118 return rebuild_socket, (DupSocket(s),)
119
120 def rebuild_socket(ds):
121 return ds.detach()
122
123 def reduce_connection(conn):
124 handle = conn.fileno()
125 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
126 ds = DupSocket(s)
127 return rebuild_connection, (ds, conn.readable, conn.writable)
128
129 def rebuild_connection(ds, readable, writable):
130 from .connection import Connection
131 sock = ds.detach()
132 return Connection(sock.detach(), readable, writable)
133
134 def reduce_pipe_connection(conn):
135 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
136 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
137 dh = DupHandle(conn.fileno(), access)
138 return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
139
140 def rebuild_pipe_connection(dh, readable, writable):
141 from .connection import PipeConnection
142 handle = dh.detach()
143 return PipeConnection(handle, readable, writable)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000144
145else:
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200146 # Unix
Benjamin Petersone711caf2008-06-11 16:44:04 +0000147 def send_handle(conn, handle, destination_pid):
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200148 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
149 s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
150 struct.pack("@i", handle))])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151
152 def recv_handle(conn):
Charles-François Natalidc863dd2011-09-24 20:04:29 +0200153 size = struct.calcsize("@i")
154 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
155 msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
156 try:
157 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
158 if (cmsg_level == socket.SOL_SOCKET and
159 cmsg_type == socket.SCM_RIGHTS):
160 return struct.unpack("@i", cmsg_data[:size])[0]
161 except (ValueError, IndexError, struct.error):
162 pass
163 raise RuntimeError('Invalid data received')
164
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200165 class DupFd(object):
166 def __init__(self, fd):
167 new_fd = os.dup(fd)
168 def send(conn, pid):
169 send_handle(conn, new_fd, pid)
170 def close():
171 os.close(new_fd)
172 self._id = resource_sharer.register(send, close)
173
174 def detach(self):
175 conn = resource_sharer.get_connection(self._id)
176 try:
177 return recv_handle(conn)
178 finally:
179 conn.close()
180
181 def reduce_socket(s):
182 df = DupFd(s.fileno())
183 return rebuild_socket, (df, s.family, s.type, s.proto)
184
185 def rebuild_socket(df, family, type, proto):
186 fd = df.detach()
187 s = socket.fromfd(fd, family, type, proto)
188 os.close(fd)
189 return s
190
191 def reduce_connection(conn):
192 df = DupFd(conn.fileno())
193 return rebuild_connection, (df, conn.readable, conn.writable)
194
195 def rebuild_connection(df, readable, writable):
196 from .connection import Connection
197 fd = df.detach()
198 return Connection(fd, readable, writable)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199
200#
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200201# Server which shares registered resources with clients
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202#
203
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200204class ResourceSharer(object):
205 def __init__(self):
206 self._key = 0
207 self._cache = {}
208 self._old_locks = []
209 self._lock = threading.Lock()
210 self._listener = None
211 self._address = None
212 register_after_fork(self, ResourceSharer._afterfork)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200214 def register(self, send, close):
215 with self._lock:
216 if self._address is None:
217 self._start()
218 self._key += 1
219 self._cache[self._key] = (send, close)
220 return (self._address, self._key)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000221
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200222 @staticmethod
223 def get_connection(ident):
224 from .connection import Client
225 address, key = ident
226 c = Client(address, authkey=current_process().authkey)
227 c.send((key, os.getpid()))
228 return c
Benjamin Petersone711caf2008-06-11 16:44:04 +0000229
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200230 def _afterfork(self):
231 for key, (send, close) in self._cache.items():
232 close()
233 self._cache.clear()
234 # If self._lock was locked at the time of the fork, it may be broken
235 # -- see issue 6721. Replace it without letting it be gc'ed.
236 self._old_locks.append(self._lock)
237 self._lock = threading.Lock()
238 if self._listener is not None:
239 self._listener.close()
240 self._listener = None
241 self._address = None
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200243 def _start(self):
244 from .connection import Listener
245 assert self._listener is None
246 debug('starting listener and thread for sending handles')
247 self._listener = Listener(authkey=current_process().authkey)
248 self._address = self._listener.address
249 t = threading.Thread(target=self._serve)
250 t.daemon = True
251 t.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000252
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200253 def _serve(self):
254 while 1:
255 try:
256 conn = self._listener.accept()
257 key, destination_pid = conn.recv()
258 send, close = self._cache.pop(key)
259 send(conn, destination_pid)
260 close()
261 conn.close()
262 except:
263 if not is_exiting():
264 import traceback
265 sub_warning(
266 'thread for sharing handles raised exception :\n' +
267 '-'*79 + '\n' + traceback.format_exc() + '-'*79
268 )
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269
Antoine Pitrou5438ed12012-04-24 22:56:57 +0200270resource_sharer = ResourceSharer()