blob: fa6f7b39019020e4b31682863af7a7cde01c86db [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A higher level module for using sockets (or Windows named pipes)
3#
4# multiprocessing/connection.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35__all__ = [ 'Client', 'Listener', 'Pipe' ]
36
37import os
38import sys
39import socket
Georg Brandl6aa2d1f2008-08-12 08:35:52 +000040import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000041import time
42import tempfile
43import itertools
44
45import _multiprocessing
Neal Norwitz5d6415e2008-08-25 01:53:32 +000046from multiprocessing import current_process, AuthenticationError
Benjamin Petersone711caf2008-06-11 16:44:04 +000047from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
48from multiprocessing.forking import duplicate, close
49
50
51#
52#
53#
54
55BUFSIZE = 8192
Antoine Pitrou45d61a32009-11-13 22:35:18 +000056# A very generous timeout when it comes to local connections...
57CONNECTION_TIMEOUT = 20.
Benjamin Petersone711caf2008-06-11 16:44:04 +000058
59_mmap_counter = itertools.count()
60
61default_family = 'AF_INET'
62families = ['AF_INET']
63
64if hasattr(socket, 'AF_UNIX'):
65 default_family = 'AF_UNIX'
66 families += ['AF_UNIX']
67
68if sys.platform == 'win32':
69 default_family = 'AF_PIPE'
70 families += ['AF_PIPE']
71
Antoine Pitrou45d61a32009-11-13 22:35:18 +000072
73def _init_timeout(timeout=CONNECTION_TIMEOUT):
74 return time.time() + timeout
75
76def _check_timeout(t):
77 return time.time() > t
78
Benjamin Petersone711caf2008-06-11 16:44:04 +000079#
80#
81#
82
83def arbitrary_address(family):
84 '''
85 Return an arbitrary free address for the given family
86 '''
87 if family == 'AF_INET':
88 return ('localhost', 0)
89 elif family == 'AF_UNIX':
90 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
91 elif family == 'AF_PIPE':
92 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
93 (os.getpid(), next(_mmap_counter)))
94 else:
95 raise ValueError('unrecognized family')
96
Antoine Pitrou709176f2012-04-01 17:19:09 +020097def _validate_family(family):
98 '''
99 Checks if the family is valid for the current environment.
100 '''
101 if sys.platform != 'win32' and family == 'AF_PIPE':
102 raise ValueError('Family %s is not recognized.' % family)
103
Benjamin Petersone711caf2008-06-11 16:44:04 +0000104
105def address_type(address):
106 '''
107 Return the types of the address
108
109 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
110 '''
111 if type(address) == tuple:
112 return 'AF_INET'
113 elif type(address) is str and address.startswith('\\\\'):
114 return 'AF_PIPE'
115 elif type(address) is str:
116 return 'AF_UNIX'
117 else:
118 raise ValueError('address type of %r unrecognized' % address)
119
120#
121# Public functions
122#
123
124class Listener(object):
125 '''
126 Returns a listener object.
127
128 This is a wrapper for a bound socket which is 'listening' for
129 connections, or for a Windows named pipe.
130 '''
131 def __init__(self, address=None, family=None, backlog=1, authkey=None):
132 family = family or (address and address_type(address)) \
133 or default_family
134 address = address or arbitrary_address(family)
135
Antoine Pitrou709176f2012-04-01 17:19:09 +0200136 _validate_family(family)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000137 if family == 'AF_PIPE':
138 self._listener = PipeListener(address, backlog)
139 else:
140 self._listener = SocketListener(address, family, backlog)
141
142 if authkey is not None and not isinstance(authkey, bytes):
143 raise TypeError('authkey should be a byte string')
144
145 self._authkey = authkey
146
147 def accept(self):
148 '''
149 Accept a connection on the bound socket or named pipe of `self`.
150
151 Returns a `Connection` object.
152 '''
153 c = self._listener.accept()
154 if self._authkey:
155 deliver_challenge(c, self._authkey)
156 answer_challenge(c, self._authkey)
157 return c
158
159 def close(self):
160 '''
161 Close the bound socket or named pipe of `self`.
162 '''
163 return self._listener.close()
164
165 address = property(lambda self: self._listener._address)
166 last_accepted = property(lambda self: self._listener._last_accepted)
167
168
169def Client(address, family=None, authkey=None):
170 '''
171 Returns a connection to the address of a `Listener`
172 '''
173 family = family or address_type(address)
Antoine Pitrou709176f2012-04-01 17:19:09 +0200174 _validate_family(family)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175 if family == 'AF_PIPE':
176 c = PipeClient(address)
177 else:
178 c = SocketClient(address)
179
180 if authkey is not None and not isinstance(authkey, bytes):
181 raise TypeError('authkey should be a byte string')
182
183 if authkey is not None:
184 answer_challenge(c, authkey)
185 deliver_challenge(c, authkey)
186
187 return c
188
189
190if sys.platform != 'win32':
191
192 def Pipe(duplex=True):
193 '''
194 Returns pair of connection objects at either end of a pipe
195 '''
196 if duplex:
197 s1, s2 = socket.socketpair()
198 c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
199 c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
200 s1.close()
201 s2.close()
202 else:
203 fd1, fd2 = os.pipe()
204 c1 = _multiprocessing.Connection(fd1, writable=False)
205 c2 = _multiprocessing.Connection(fd2, readable=False)
206
207 return c1, c2
208
209else:
210
Brian Curtina6a32742010-08-04 15:47:24 +0000211 from _multiprocessing import win32
Benjamin Petersone711caf2008-06-11 16:44:04 +0000212
213 def Pipe(duplex=True):
214 '''
215 Returns pair of connection objects at either end of a pipe
216 '''
217 address = arbitrary_address('AF_PIPE')
218 if duplex:
219 openmode = win32.PIPE_ACCESS_DUPLEX
220 access = win32.GENERIC_READ | win32.GENERIC_WRITE
221 obsize, ibsize = BUFSIZE, BUFSIZE
222 else:
223 openmode = win32.PIPE_ACCESS_INBOUND
224 access = win32.GENERIC_WRITE
225 obsize, ibsize = 0, BUFSIZE
226
227 h1 = win32.CreateNamedPipe(
228 address, openmode,
229 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
230 win32.PIPE_WAIT,
231 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
232 )
233 h2 = win32.CreateFile(
234 address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
235 )
236 win32.SetNamedPipeHandleState(
237 h2, win32.PIPE_READMODE_MESSAGE, None, None
238 )
239
240 try:
241 win32.ConnectNamedPipe(h1, win32.NULL)
242 except WindowsError as e:
243 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
244 raise
245
246 c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
247 c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
248
249 return c1, c2
250
251#
252# Definitions for connections based on sockets
253#
254
255class SocketListener(object):
256 '''
Georg Brandl734e2682008-08-12 08:18:18 +0000257 Representation of a socket which is bound to an address and listening
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258 '''
259 def __init__(self, address, family, backlog=1):
260 self._socket = socket.socket(getattr(socket, family))
Charles-François Natali992ca522012-02-04 14:55:53 +0100261 try:
262 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
263 self._socket.bind(address)
264 self._socket.listen(backlog)
265 self._address = self._socket.getsockname()
266 except socket.error:
267 self._socket.close()
268 raise
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269 self._family = family
270 self._last_accepted = None
271
Benjamin Petersone711caf2008-06-11 16:44:04 +0000272 if family == 'AF_UNIX':
273 self._unlink = Finalize(
Georg Brandl2ee470f2008-07-16 12:55:28 +0000274 self, os.unlink, args=(address,), exitpriority=0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275 )
276 else:
277 self._unlink = None
278
279 def accept(self):
280 s, self._last_accepted = self._socket.accept()
281 fd = duplicate(s.fileno())
282 conn = _multiprocessing.Connection(fd)
283 s.close()
284 return conn
285
286 def close(self):
287 self._socket.close()
288 if self._unlink is not None:
289 self._unlink()
290
291
292def SocketClient(address):
293 '''
294 Return a connection object connected to the socket given by `address`
295 '''
296 family = address_type(address)
Victor Stinner2b695062011-01-03 15:47:59 +0000297 with socket.socket( getattr(socket, family) ) as s:
298 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000299
Victor Stinner2b695062011-01-03 15:47:59 +0000300 while 1:
301 try:
302 s.connect(address)
303 except socket.error as e:
304 if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
305 debug('failed to connect to address %s', address)
306 raise
307 time.sleep(0.01)
308 else:
309 break
Benjamin Petersone711caf2008-06-11 16:44:04 +0000310 else:
Victor Stinner2b695062011-01-03 15:47:59 +0000311 raise
Benjamin Petersone711caf2008-06-11 16:44:04 +0000312
Victor Stinner2b695062011-01-03 15:47:59 +0000313 fd = duplicate(s.fileno())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000314 conn = _multiprocessing.Connection(fd)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000315 return conn
316
317#
318# Definitions for connections based on named pipes
319#
320
321if sys.platform == 'win32':
322
323 class PipeListener(object):
324 '''
325 Representation of a named pipe
326 '''
327 def __init__(self, address, backlog=None):
328 self._address = address
329 handle = win32.CreateNamedPipe(
330 address, win32.PIPE_ACCESS_DUPLEX,
331 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
332 win32.PIPE_WAIT,
333 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
334 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
335 )
336 self._handle_queue = [handle]
337 self._last_accepted = None
338
339 sub_debug('listener created with address=%r', self._address)
340
341 self.close = Finalize(
342 self, PipeListener._finalize_pipe_listener,
343 args=(self._handle_queue, self._address), exitpriority=0
344 )
345
346 def accept(self):
347 newhandle = win32.CreateNamedPipe(
348 self._address, win32.PIPE_ACCESS_DUPLEX,
349 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
350 win32.PIPE_WAIT,
351 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
352 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
353 )
354 self._handle_queue.append(newhandle)
355 handle = self._handle_queue.pop(0)
356 try:
357 win32.ConnectNamedPipe(handle, win32.NULL)
358 except WindowsError as e:
359 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
360 raise
361 return _multiprocessing.PipeConnection(handle)
362
363 @staticmethod
364 def _finalize_pipe_listener(queue, address):
365 sub_debug('closing listener with address=%r', address)
366 for handle in queue:
367 close(handle)
368
369 def PipeClient(address):
370 '''
371 Return a connection object connected to the pipe given by `address`
372 '''
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000373 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000374 while 1:
375 try:
376 win32.WaitNamedPipe(address, 1000)
377 h = win32.CreateFile(
378 address, win32.GENERIC_READ | win32.GENERIC_WRITE,
379 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
380 )
381 except WindowsError as e:
382 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000383 win32.ERROR_PIPE_BUSY) or _check_timeout(t):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000384 raise
385 else:
386 break
387 else:
388 raise
389
390 win32.SetNamedPipeHandleState(
391 h, win32.PIPE_READMODE_MESSAGE, None, None
392 )
393 return _multiprocessing.PipeConnection(h)
394
395#
396# Authentication stuff
397#
398
399MESSAGE_LENGTH = 20
400
Benjamin Peterson1fcfe212008-06-25 12:54:22 +0000401CHALLENGE = b'#CHALLENGE#'
402WELCOME = b'#WELCOME#'
403FAILURE = b'#FAILURE#'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000404
405def deliver_challenge(connection, authkey):
406 import hmac
407 assert isinstance(authkey, bytes)
408 message = os.urandom(MESSAGE_LENGTH)
409 connection.send_bytes(CHALLENGE + message)
410 digest = hmac.new(authkey, message).digest()
411 response = connection.recv_bytes(256) # reject large message
412 if response == digest:
413 connection.send_bytes(WELCOME)
414 else:
415 connection.send_bytes(FAILURE)
416 raise AuthenticationError('digest received was wrong')
417
418def answer_challenge(connection, authkey):
419 import hmac
420 assert isinstance(authkey, bytes)
421 message = connection.recv_bytes(256) # reject large message
422 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
423 message = message[len(CHALLENGE):]
424 digest = hmac.new(authkey, message).digest()
425 connection.send_bytes(digest)
426 response = connection.recv_bytes(256) # reject large message
427 if response != WELCOME:
428 raise AuthenticationError('digest sent was rejected')
429
430#
431# Support for using xmlrpclib for serialization
432#
433
434class ConnectionWrapper(object):
435 def __init__(self, conn, dumps, loads):
436 self._conn = conn
437 self._dumps = dumps
438 self._loads = loads
439 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
440 obj = getattr(conn, attr)
441 setattr(self, attr, obj)
442 def send(self, obj):
443 s = self._dumps(obj)
444 self._conn.send_bytes(s)
445 def recv(self):
446 s = self._conn.recv_bytes()
447 return self._loads(s)
448
449def _xml_dumps(obj):
450 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
451
452def _xml_loads(s):
453 (obj,), method = xmlrpclib.loads(s.decode('utf8'))
454 return obj
455
456class XmlListener(Listener):
457 def accept(self):
458 global xmlrpclib
459 import xmlrpc.client as xmlrpclib
460 obj = Listener.accept(self)
461 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
462
463def XmlClient(*args, **kwds):
464 global xmlrpclib
465 import xmlrpc.client as xmlrpclib
466 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)