blob: dda4a4120b407126565b93b7cf7662faed8f54b1 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module to allow connection and socket objects to be transferred
3# between processes
4#
5# multiprocessing/reduction.py
6#
R. David Murray3fc969a2010-12-14 01:38:16 +00007# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15# notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17# notice, this list of conditions and the following disclaimer in the
18# documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20# used to endorse or promote products derived from this software
21# without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000034#
35
36__all__ = []
37
38import os
39import sys
40import socket
41import threading
Charles-François Natalidc863dd2011-09-24 20:04:29 +020042import struct
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
Benjamin Petersone711caf2008-06-11 16:44:04 +000044from multiprocessing import current_process
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000045from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000046from multiprocessing.util import register_after_fork, debug, sub_debug
Antoine Pitrou87cf2202011-05-09 17:04:27 +020047from multiprocessing.connection import Client, Listener, Connection
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
49
50#
51#
52#
53
Charles-François Natalidc863dd2011-09-24 20:04:29 +020054if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
55 hasattr(socket, 'SCM_RIGHTS'))):
Benjamin Petersone711caf2008-06-11 16:44:04 +000056 raise ImportError('pickling of connections not supported')
57
58#
59# Platform specific definitions
60#
61
62if sys.platform == 'win32':
Brian Curtina6a32742010-08-04 15:47:24 +000063 from _multiprocessing import win32
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
65 def send_handle(conn, handle, destination_pid):
66 process_handle = win32.OpenProcess(
67 win32.PROCESS_ALL_ACCESS, False, destination_pid
68 )
69 try:
70 new_handle = duplicate(handle, process_handle)
71 conn.send(new_handle)
72 finally:
73 close(process_handle)
74
75 def recv_handle(conn):
76 return conn.recv()
77
78else:
79 def send_handle(conn, handle, destination_pid):
Charles-François Natalidc863dd2011-09-24 20:04:29 +020080 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
81 s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
82 struct.pack("@i", handle))])
Benjamin Petersone711caf2008-06-11 16:44:04 +000083
84 def recv_handle(conn):
Charles-François Natalidc863dd2011-09-24 20:04:29 +020085 size = struct.calcsize("@i")
86 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
87 msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
88 try:
89 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
90 if (cmsg_level == socket.SOL_SOCKET and
91 cmsg_type == socket.SCM_RIGHTS):
92 return struct.unpack("@i", cmsg_data[:size])[0]
93 except (ValueError, IndexError, struct.error):
94 pass
95 raise RuntimeError('Invalid data received')
96
Benjamin Petersone711caf2008-06-11 16:44:04 +000097
98#
99# Support for a per-process server thread which caches pickled handles
100#
101
102_cache = set()
103
104def _reset(obj):
105 global _lock, _listener, _cache
106 for h in _cache:
107 close(h)
108 _cache.clear()
109 _lock = threading.Lock()
110 _listener = None
111
112_reset(None)
113register_after_fork(_reset, _reset)
114
115def _get_listener():
116 global _listener
117
118 if _listener is None:
119 _lock.acquire()
120 try:
121 if _listener is None:
122 debug('starting listener and thread for sending handles')
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000123 _listener = Listener(authkey=current_process().authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000124 t = threading.Thread(target=_serve)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000125 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000126 t.start()
127 finally:
128 _lock.release()
129
130 return _listener
131
132def _serve():
133 from .util import is_exiting, sub_warning
134
135 while 1:
136 try:
137 conn = _listener.accept()
138 handle_wanted, destination_pid = conn.recv()
139 _cache.remove(handle_wanted)
140 send_handle(conn, handle_wanted, destination_pid)
141 close(handle_wanted)
142 conn.close()
143 except:
144 if not is_exiting():
145 import traceback
146 sub_warning(
147 'thread for sharing handles raised exception :\n' +
148 '-'*79 + '\n' + traceback.format_exc() + '-'*79
149 )
150
151#
152# Functions to be used for pickling/unpickling objects with handles
153#
154
155def reduce_handle(handle):
156 if Popen.thread_is_spawning():
157 return (None, Popen.duplicate_for_child(handle), True)
158 dup_handle = duplicate(handle)
159 _cache.add(dup_handle)
160 sub_debug('reducing handle %d', handle)
161 return (_get_listener().address, dup_handle, False)
162
163def rebuild_handle(pickled_data):
164 address, handle, inherited = pickled_data
165 if inherited:
166 return handle
167 sub_debug('rebuilding handle %d', handle)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000168 conn = Client(address, authkey=current_process().authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169 conn.send((handle, os.getpid()))
170 new_handle = recv_handle(conn)
171 conn.close()
172 return new_handle
173
174#
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200175# Register `Connection` with `ForkingPickler`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000176#
177
178def reduce_connection(conn):
179 rh = reduce_handle(conn.fileno())
180 return rebuild_connection, (rh, conn.readable, conn.writable)
181
182def rebuild_connection(reduced_handle, readable, writable):
183 handle = rebuild_handle(reduced_handle)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200184 return Connection(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 handle, readable=readable, writable=writable
186 )
187
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200188ForkingPickler.register(Connection, reduce_connection)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000189
190#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000191# Register `socket.socket` with `ForkingPickler`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192#
193
194def fromfd(fd, family, type_, proto=0):
195 s = socket.fromfd(fd, family, type_, proto)
196 if s.__class__ is not socket.socket:
197 s = socket.socket(_sock=s)
198 return s
199
200def reduce_socket(s):
201 reduced_handle = reduce_handle(s.fileno())
202 return rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
203
204def rebuild_socket(reduced_handle, family, type_, proto):
205 fd = rebuild_handle(reduced_handle)
206 _sock = fromfd(fd, family, type_, proto)
207 close(fd)
208 return _sock
209
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000210ForkingPickler.register(socket.socket, reduce_socket)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000211
212#
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000213# Register `_multiprocessing.PipeConnection` with `ForkingPickler`
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214#
215
216if sys.platform == 'win32':
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200217 from multiprocessing.connection import PipeConnection
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218
219 def reduce_pipe_connection(conn):
220 rh = reduce_handle(conn.fileno())
221 return rebuild_pipe_connection, (rh, conn.readable, conn.writable)
222
223 def rebuild_pipe_connection(reduced_handle, readable, writable):
224 handle = rebuild_handle(reduced_handle)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200225 return PipeConnection(
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 handle, readable=readable, writable=writable
227 )
228
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200229 ForkingPickler.register(PipeConnection, reduce_pipe_connection)