blob: 5e94fedfb8d559cbed4e0813f9e81e54a26f4d31 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A higher level module for using sockets (or Windows named pipes)
3#
4# multiprocessing/connection.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9__all__ = [ 'Client', 'Listener', 'Pipe' ]
10
11import os
12import sys
13import socket
Georg Brandl6aa2d1f2008-08-12 08:35:52 +000014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import time
16import tempfile
17import itertools
18
19import _multiprocessing
Neal Norwitz5d6415e2008-08-25 01:53:32 +000020from multiprocessing import current_process, AuthenticationError
Benjamin Petersone711caf2008-06-11 16:44:04 +000021from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
22from multiprocessing.forking import duplicate, close
23
24
25#
26#
27#
28
29BUFSIZE = 8192
30
31_mmap_counter = itertools.count()
32
33default_family = 'AF_INET'
34families = ['AF_INET']
35
36if hasattr(socket, 'AF_UNIX'):
37 default_family = 'AF_UNIX'
38 families += ['AF_UNIX']
39
40if sys.platform == 'win32':
41 default_family = 'AF_PIPE'
42 families += ['AF_PIPE']
43
44#
45#
46#
47
48def arbitrary_address(family):
49 '''
50 Return an arbitrary free address for the given family
51 '''
52 if family == 'AF_INET':
53 return ('localhost', 0)
54 elif family == 'AF_UNIX':
55 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
56 elif family == 'AF_PIPE':
57 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
58 (os.getpid(), next(_mmap_counter)))
59 else:
60 raise ValueError('unrecognized family')
61
62
63def address_type(address):
64 '''
65 Return the types of the address
66
67 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
68 '''
69 if type(address) == tuple:
70 return 'AF_INET'
71 elif type(address) is str and address.startswith('\\\\'):
72 return 'AF_PIPE'
73 elif type(address) is str:
74 return 'AF_UNIX'
75 else:
76 raise ValueError('address type of %r unrecognized' % address)
77
78#
79# Public functions
80#
81
82class Listener(object):
83 '''
84 Returns a listener object.
85
86 This is a wrapper for a bound socket which is 'listening' for
87 connections, or for a Windows named pipe.
88 '''
89 def __init__(self, address=None, family=None, backlog=1, authkey=None):
90 family = family or (address and address_type(address)) \
91 or default_family
92 address = address or arbitrary_address(family)
93
94 if family == 'AF_PIPE':
95 self._listener = PipeListener(address, backlog)
96 else:
97 self._listener = SocketListener(address, family, backlog)
98
99 if authkey is not None and not isinstance(authkey, bytes):
100 raise TypeError('authkey should be a byte string')
101
102 self._authkey = authkey
103
104 def accept(self):
105 '''
106 Accept a connection on the bound socket or named pipe of `self`.
107
108 Returns a `Connection` object.
109 '''
110 c = self._listener.accept()
111 if self._authkey:
112 deliver_challenge(c, self._authkey)
113 answer_challenge(c, self._authkey)
114 return c
115
116 def close(self):
117 '''
118 Close the bound socket or named pipe of `self`.
119 '''
120 return self._listener.close()
121
122 address = property(lambda self: self._listener._address)
123 last_accepted = property(lambda self: self._listener._last_accepted)
124
125
126def Client(address, family=None, authkey=None):
127 '''
128 Returns a connection to the address of a `Listener`
129 '''
130 family = family or address_type(address)
131 if family == 'AF_PIPE':
132 c = PipeClient(address)
133 else:
134 c = SocketClient(address)
135
136 if authkey is not None and not isinstance(authkey, bytes):
137 raise TypeError('authkey should be a byte string')
138
139 if authkey is not None:
140 answer_challenge(c, authkey)
141 deliver_challenge(c, authkey)
142
143 return c
144
145
146if sys.platform != 'win32':
147
148 def Pipe(duplex=True):
149 '''
150 Returns pair of connection objects at either end of a pipe
151 '''
152 if duplex:
153 s1, s2 = socket.socketpair()
154 c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
155 c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
156 s1.close()
157 s2.close()
158 else:
159 fd1, fd2 = os.pipe()
160 c1 = _multiprocessing.Connection(fd1, writable=False)
161 c2 = _multiprocessing.Connection(fd2, readable=False)
162
163 return c1, c2
164
165else:
166
167 from ._multiprocessing import win32
168
169 def Pipe(duplex=True):
170 '''
171 Returns pair of connection objects at either end of a pipe
172 '''
173 address = arbitrary_address('AF_PIPE')
174 if duplex:
175 openmode = win32.PIPE_ACCESS_DUPLEX
176 access = win32.GENERIC_READ | win32.GENERIC_WRITE
177 obsize, ibsize = BUFSIZE, BUFSIZE
178 else:
179 openmode = win32.PIPE_ACCESS_INBOUND
180 access = win32.GENERIC_WRITE
181 obsize, ibsize = 0, BUFSIZE
182
183 h1 = win32.CreateNamedPipe(
184 address, openmode,
185 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
186 win32.PIPE_WAIT,
187 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
188 )
189 h2 = win32.CreateFile(
190 address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
191 )
192 win32.SetNamedPipeHandleState(
193 h2, win32.PIPE_READMODE_MESSAGE, None, None
194 )
195
196 try:
197 win32.ConnectNamedPipe(h1, win32.NULL)
198 except WindowsError as e:
199 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
200 raise
201
202 c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
203 c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
204
205 return c1, c2
206
207#
208# Definitions for connections based on sockets
209#
210
211class SocketListener(object):
212 '''
Georg Brandl734e2682008-08-12 08:18:18 +0000213 Representation of a socket which is bound to an address and listening
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214 '''
215 def __init__(self, address, family, backlog=1):
216 self._socket = socket.socket(getattr(socket, family))
Jesse Nollerc5d28a02009-03-30 16:37:36 +0000217 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 self._socket.bind(address)
219 self._socket.listen(backlog)
Georg Brandl6aa2d1f2008-08-12 08:35:52 +0000220 self._address = self._socket.getsockname()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000221 self._family = family
222 self._last_accepted = None
223
Benjamin Petersone711caf2008-06-11 16:44:04 +0000224 if family == 'AF_UNIX':
225 self._unlink = Finalize(
Georg Brandl2ee470f2008-07-16 12:55:28 +0000226 self, os.unlink, args=(address,), exitpriority=0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227 )
228 else:
229 self._unlink = None
230
231 def accept(self):
232 s, self._last_accepted = self._socket.accept()
233 fd = duplicate(s.fileno())
234 conn = _multiprocessing.Connection(fd)
235 s.close()
236 return conn
237
238 def close(self):
239 self._socket.close()
240 if self._unlink is not None:
241 self._unlink()
242
243
244def SocketClient(address):
245 '''
246 Return a connection object connected to the socket given by `address`
247 '''
248 family = address_type(address)
249 s = socket.socket( getattr(socket, family) )
250
251 while 1:
252 try:
253 s.connect(address)
254 except socket.error as e:
Georg Brandl6aa2d1f2008-08-12 08:35:52 +0000255 if e.args[0] != errno.ECONNREFUSED: # connection refused
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256 debug('failed to connect to address %s', address)
257 raise
258 time.sleep(0.01)
259 else:
260 break
261 else:
262 raise
263
264 fd = duplicate(s.fileno())
265 conn = _multiprocessing.Connection(fd)
266 s.close()
267 return conn
268
269#
270# Definitions for connections based on named pipes
271#
272
273if sys.platform == 'win32':
274
275 class PipeListener(object):
276 '''
277 Representation of a named pipe
278 '''
279 def __init__(self, address, backlog=None):
280 self._address = address
281 handle = win32.CreateNamedPipe(
282 address, win32.PIPE_ACCESS_DUPLEX,
283 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
284 win32.PIPE_WAIT,
285 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
286 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
287 )
288 self._handle_queue = [handle]
289 self._last_accepted = None
290
291 sub_debug('listener created with address=%r', self._address)
292
293 self.close = Finalize(
294 self, PipeListener._finalize_pipe_listener,
295 args=(self._handle_queue, self._address), exitpriority=0
296 )
297
298 def accept(self):
299 newhandle = win32.CreateNamedPipe(
300 self._address, win32.PIPE_ACCESS_DUPLEX,
301 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
302 win32.PIPE_WAIT,
303 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
304 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
305 )
306 self._handle_queue.append(newhandle)
307 handle = self._handle_queue.pop(0)
308 try:
309 win32.ConnectNamedPipe(handle, win32.NULL)
310 except WindowsError as e:
311 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
312 raise
313 return _multiprocessing.PipeConnection(handle)
314
315 @staticmethod
316 def _finalize_pipe_listener(queue, address):
317 sub_debug('closing listener with address=%r', address)
318 for handle in queue:
319 close(handle)
320
321 def PipeClient(address):
322 '''
323 Return a connection object connected to the pipe given by `address`
324 '''
325 while 1:
326 try:
327 win32.WaitNamedPipe(address, 1000)
328 h = win32.CreateFile(
329 address, win32.GENERIC_READ | win32.GENERIC_WRITE,
330 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
331 )
332 except WindowsError as e:
333 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
334 win32.ERROR_PIPE_BUSY):
335 raise
336 else:
337 break
338 else:
339 raise
340
341 win32.SetNamedPipeHandleState(
342 h, win32.PIPE_READMODE_MESSAGE, None, None
343 )
344 return _multiprocessing.PipeConnection(h)
345
346#
347# Authentication stuff
348#
349
350MESSAGE_LENGTH = 20
351
Benjamin Peterson1fcfe212008-06-25 12:54:22 +0000352CHALLENGE = b'#CHALLENGE#'
353WELCOME = b'#WELCOME#'
354FAILURE = b'#FAILURE#'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000355
356def deliver_challenge(connection, authkey):
357 import hmac
358 assert isinstance(authkey, bytes)
359 message = os.urandom(MESSAGE_LENGTH)
360 connection.send_bytes(CHALLENGE + message)
361 digest = hmac.new(authkey, message).digest()
362 response = connection.recv_bytes(256) # reject large message
363 if response == digest:
364 connection.send_bytes(WELCOME)
365 else:
366 connection.send_bytes(FAILURE)
367 raise AuthenticationError('digest received was wrong')
368
369def answer_challenge(connection, authkey):
370 import hmac
371 assert isinstance(authkey, bytes)
372 message = connection.recv_bytes(256) # reject large message
373 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
374 message = message[len(CHALLENGE):]
375 digest = hmac.new(authkey, message).digest()
376 connection.send_bytes(digest)
377 response = connection.recv_bytes(256) # reject large message
378 if response != WELCOME:
379 raise AuthenticationError('digest sent was rejected')
380
381#
382# Support for using xmlrpclib for serialization
383#
384
385class ConnectionWrapper(object):
386 def __init__(self, conn, dumps, loads):
387 self._conn = conn
388 self._dumps = dumps
389 self._loads = loads
390 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
391 obj = getattr(conn, attr)
392 setattr(self, attr, obj)
393 def send(self, obj):
394 s = self._dumps(obj)
395 self._conn.send_bytes(s)
396 def recv(self):
397 s = self._conn.recv_bytes()
398 return self._loads(s)
399
400def _xml_dumps(obj):
401 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
402
403def _xml_loads(s):
404 (obj,), method = xmlrpclib.loads(s.decode('utf8'))
405 return obj
406
407class XmlListener(Listener):
408 def accept(self):
409 global xmlrpclib
410 import xmlrpc.client as xmlrpclib
411 obj = Listener.accept(self)
412 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
413
414def XmlClient(*args, **kwds):
415 global xmlrpclib
416 import xmlrpc.client as xmlrpclib
417 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)