blob: 042a1368e0eafed6975a1782467614f61aed4b19 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module to allow connection and socket objects to be transferred
3# between processes
4#
5# multiprocessing/reduction.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15# notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17# notice, this list of conditions and the following disclaimer in the
18# documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20# used to endorse or promote products derived from this software
21# without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000034#
35
36__all__ = []
37
38import os
39import sys
40import socket
41import threading
Charles-François Natalidc863dd2011-09-24 20:04:29 +020042import struct
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
44import _multiprocessing
45from multiprocessing import current_process
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000046from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000047from multiprocessing.util import register_after_fork, debug, sub_debug
Antoine Pitrou87cf2202011-05-09 17:04:27 +020048from multiprocessing.connection import Client, Listener, Connection
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
50
51#
52#
53#
54
Charles-François Natalidc863dd2011-09-24 20:04:29 +020055if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
56 hasattr(socket, 'SCM_RIGHTS'))):
Benjamin Petersone711caf2008-06-11 16:44:04 +000057 raise ImportError('pickling of connections not supported')
58
59#
60# Platform specific definitions
61#
62
63if sys.platform == 'win32':
64 import _subprocess
Brian Curtina6a32742010-08-04 15:47:24 +000065 from _multiprocessing import win32
Benjamin Petersone711caf2008-06-11 16:44:04 +000066
67 def send_handle(conn, handle, destination_pid):
68 process_handle = win32.OpenProcess(
69 win32.PROCESS_ALL_ACCESS, False, destination_pid
70 )
71 try:
72 new_handle = duplicate(handle, process_handle)
73 conn.send(new_handle)
74 finally:
75 close(process_handle)
76
77 def recv_handle(conn):
78 return conn.recv()
79
80else:
81 def send_handle(conn, handle, destination_pid):
Charles-François Natalidc863dd2011-09-24 20:04:29 +020082 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
83 s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
84 struct.pack("@i", handle))])
Benjamin Petersone711caf2008-06-11 16:44:04 +000085
86 def recv_handle(conn):
Charles-François Natalidc863dd2011-09-24 20:04:29 +020087 size = struct.calcsize("@i")
88 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
89 msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
90 try:
91 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
92 if (cmsg_level == socket.SOL_SOCKET and
93 cmsg_type == socket.SCM_RIGHTS):
94 return struct.unpack("@i", cmsg_data[:size])[0]
95 except (ValueError, IndexError, struct.error):
96 pass
97 raise RuntimeError('Invalid data received')
98
Benjamin Petersone711caf2008-06-11 16:44:04 +000099
100#
101# Support for a per-process server thread which caches pickled handles
102#
103
104_cache = set()
105
106def _reset(obj):
107 global _lock, _listener, _cache
108 for h in _cache:
109 close(h)
110 _cache.clear()
111 _lock = threading.Lock()
112 _listener = None
113
114_reset(None)
115register_after_fork(_reset, _reset)
116
117def _get_listener():
118 global _listener
119
120 if _listener is None:
121 _lock.acquire()
122 try:
123 if _listener is None:
124 debug('starting listener and thread for sending handles')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000125 _listener = Listener(authkey=current_process().authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000126 t = threading.Thread(target=_serve)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000127 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000128 t.start()
129 finally:
130 _lock.release()
131
132 return _listener
133
134def _serve():
135 from .util import is_exiting, sub_warning
136
137 while 1:
138 try:
139 conn = _listener.accept()
140 handle_wanted, destination_pid = conn.recv()
141 _cache.remove(handle_wanted)
142 send_handle(conn, handle_wanted, destination_pid)
143 close(handle_wanted)
144 conn.close()
145 except:
146 if not is_exiting():
147 import traceback
148 sub_warning(
149 'thread for sharing handles raised exception :\n' +
150 '-'*79 + '\n' + traceback.format_exc() + '-'*79
151 )
152
153#
154# Functions to be used for pickling/unpickling objects with handles
155#
156
157def reduce_handle(handle):
158 if Popen.thread_is_spawning():
159 return (None, Popen.duplicate_for_child(handle), True)
160 dup_handle = duplicate(handle)
161 _cache.add(dup_handle)
162 sub_debug('reducing handle %d', handle)
163 return (_get_listener().address, dup_handle, False)
164
165def rebuild_handle(pickled_data):
166 address, handle, inherited = pickled_data
167 if inherited:
168 return handle
169 sub_debug('rebuilding handle %d', handle)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000170 conn = Client(address, authkey=current_process().authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000171 conn.send((handle, os.getpid()))
172 new_handle = recv_handle(conn)
173 conn.close()
174 return new_handle
175
176#
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200177# Register `Connection` with `ForkingPickler`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178#
179
180def reduce_connection(conn):
181 rh = reduce_handle(conn.fileno())
182 return rebuild_connection, (rh, conn.readable, conn.writable)
183
184def rebuild_connection(reduced_handle, readable, writable):
185 handle = rebuild_handle(reduced_handle)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200186 return Connection(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 handle, readable=readable, writable=writable
188 )
189
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200190ForkingPickler.register(Connection, reduce_connection)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191
192#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000193# Register `socket.socket` with `ForkingPickler`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000194#
195
196def fromfd(fd, family, type_, proto=0):
197 s = socket.fromfd(fd, family, type_, proto)
198 if s.__class__ is not socket.socket:
199 s = socket.socket(_sock=s)
200 return s
201
202def reduce_socket(s):
203 reduced_handle = reduce_handle(s.fileno())
204 return rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
205
206def rebuild_socket(reduced_handle, family, type_, proto):
207 fd = rebuild_handle(reduced_handle)
208 _sock = fromfd(fd, family, type_, proto)
209 close(fd)
210 return _sock
211
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000212ForkingPickler.register(socket.socket, reduce_socket)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213
214#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000215# Register `_multiprocessing.PipeConnection` with `ForkingPickler`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216#
217
218if sys.platform == 'win32':
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200219 from multiprocessing.connection import PipeConnection
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220
221 def reduce_pipe_connection(conn):
222 rh = reduce_handle(conn.fileno())
223 return rebuild_pipe_connection, (rh, conn.readable, conn.writable)
224
225 def rebuild_pipe_connection(reduced_handle, readable, writable):
226 handle = rebuild_handle(reduced_handle)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200227 return PipeConnection(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228 handle, readable=readable, writable=writable
229 )
230
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200231 ForkingPickler.register(PipeConnection, reduce_pipe_connection)