blob: 57bf811eda5835d7d4d16d33d087396328039af5 [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# A higher level module for using sockets (or Windows named pipes)
3#
4# multiprocessing/connection.py
5#
R. David Murray79af2452010-12-14 01:42:40 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000033#
34
35__all__ = [ 'Client', 'Listener', 'Pipe' ]
36
37import os
38import sys
39import socket
Jesse Noller5d353732008-08-11 19:00:15 +000040import errno
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000041import time
42import tempfile
43import itertools
44
45import _multiprocessing
Neal Norwitz0c519b32008-08-25 01:50:24 +000046from multiprocessing import current_process, AuthenticationError
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000047from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
48from multiprocessing.forking import duplicate, close
49
50
51#
52#
53#
54
55BUFSIZE = 8192
Antoine Pitrouc562ca42009-11-13 22:31:18 +000056# A very generous timeout when it comes to local connections...
57CONNECTION_TIMEOUT = 20.
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000058
59_mmap_counter = itertools.count()
60
61default_family = 'AF_INET'
62families = ['AF_INET']
63
64if hasattr(socket, 'AF_UNIX'):
65 default_family = 'AF_UNIX'
66 families += ['AF_UNIX']
67
68if sys.platform == 'win32':
69 default_family = 'AF_PIPE'
70 families += ['AF_PIPE']
71
Antoine Pitrouc562ca42009-11-13 22:31:18 +000072
73def _init_timeout(timeout=CONNECTION_TIMEOUT):
74 return time.time() + timeout
75
76def _check_timeout(t):
77 return time.time() > t
78
Benjamin Peterson7f03ea72008-06-13 19:20:48 +000079#
80#
81#
82
83def arbitrary_address(family):
84 '''
85 Return an arbitrary free address for the given family
86 '''
87 if family == 'AF_INET':
88 return ('localhost', 0)
89 elif family == 'AF_UNIX':
90 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
91 elif family == 'AF_PIPE':
92 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
93 (os.getpid(), _mmap_counter.next()))
94 else:
95 raise ValueError('unrecognized family')
96
97
98def address_type(address):
99 '''
100 Return the types of the address
101
102 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
103 '''
104 if type(address) == tuple:
105 return 'AF_INET'
106 elif type(address) is str and address.startswith('\\\\'):
107 return 'AF_PIPE'
108 elif type(address) is str:
109 return 'AF_UNIX'
110 else:
111 raise ValueError('address type of %r unrecognized' % address)
112
113#
114# Public functions
115#
116
117class Listener(object):
118 '''
119 Returns a listener object.
120
121 This is a wrapper for a bound socket which is 'listening' for
122 connections, or for a Windows named pipe.
123 '''
124 def __init__(self, address=None, family=None, backlog=1, authkey=None):
125 family = family or (address and address_type(address)) \
126 or default_family
127 address = address or arbitrary_address(family)
128
129 if family == 'AF_PIPE':
130 self._listener = PipeListener(address, backlog)
131 else:
132 self._listener = SocketListener(address, family, backlog)
133
134 if authkey is not None and not isinstance(authkey, bytes):
135 raise TypeError, 'authkey should be a byte string'
136
137 self._authkey = authkey
138
139 def accept(self):
140 '''
141 Accept a connection on the bound socket or named pipe of `self`.
142
143 Returns a `Connection` object.
144 '''
145 c = self._listener.accept()
146 if self._authkey:
147 deliver_challenge(c, self._authkey)
148 answer_challenge(c, self._authkey)
149 return c
150
151 def close(self):
152 '''
153 Close the bound socket or named pipe of `self`.
154 '''
155 return self._listener.close()
156
157 address = property(lambda self: self._listener._address)
158 last_accepted = property(lambda self: self._listener._last_accepted)
159
160
161def Client(address, family=None, authkey=None):
162 '''
163 Returns a connection to the address of a `Listener`
164 '''
165 family = family or address_type(address)
166 if family == 'AF_PIPE':
167 c = PipeClient(address)
168 else:
169 c = SocketClient(address)
170
171 if authkey is not None and not isinstance(authkey, bytes):
172 raise TypeError, 'authkey should be a byte string'
173
174 if authkey is not None:
175 answer_challenge(c, authkey)
176 deliver_challenge(c, authkey)
177
178 return c
179
180
181if sys.platform != 'win32':
182
183 def Pipe(duplex=True):
184 '''
185 Returns pair of connection objects at either end of a pipe
186 '''
187 if duplex:
188 s1, s2 = socket.socketpair()
Richard Oudkerke4b99382012-07-27 14:05:46 +0100189 s1.setblocking(True)
190 s2.setblocking(True)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000191 c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
192 c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
193 s1.close()
194 s2.close()
195 else:
196 fd1, fd2 = os.pipe()
197 c1 = _multiprocessing.Connection(fd1, writable=False)
198 c2 = _multiprocessing.Connection(fd2, readable=False)
199
200 return c1, c2
201
202else:
Jesse Noller2f8c8f42010-07-03 12:26:02 +0000203 from _multiprocessing import win32
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000204
205 def Pipe(duplex=True):
206 '''
207 Returns pair of connection objects at either end of a pipe
208 '''
209 address = arbitrary_address('AF_PIPE')
210 if duplex:
211 openmode = win32.PIPE_ACCESS_DUPLEX
212 access = win32.GENERIC_READ | win32.GENERIC_WRITE
213 obsize, ibsize = BUFSIZE, BUFSIZE
214 else:
215 openmode = win32.PIPE_ACCESS_INBOUND
216 access = win32.GENERIC_WRITE
217 obsize, ibsize = 0, BUFSIZE
218
219 h1 = win32.CreateNamedPipe(
220 address, openmode,
221 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
222 win32.PIPE_WAIT,
223 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
224 )
225 h2 = win32.CreateFile(
226 address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
227 )
228 win32.SetNamedPipeHandleState(
229 h2, win32.PIPE_READMODE_MESSAGE, None, None
230 )
231
232 try:
233 win32.ConnectNamedPipe(h1, win32.NULL)
234 except WindowsError, e:
235 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
236 raise
237
238 c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
239 c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
240
241 return c1, c2
242
243#
244# Definitions for connections based on sockets
245#
246
247class SocketListener(object):
248 '''
Mark Dickinson97521952008-08-06 20:12:30 +0000249 Representation of a socket which is bound to an address and listening
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000250 '''
251 def __init__(self, address, family, backlog=1):
252 self._socket = socket.socket(getattr(socket, family))
Charles-François Natali709aa352012-02-04 14:40:25 +0100253 try:
254 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Richard Oudkerke4b99382012-07-27 14:05:46 +0100255 self._socket.setblocking(True)
Charles-François Natali709aa352012-02-04 14:40:25 +0100256 self._socket.bind(address)
257 self._socket.listen(backlog)
258 self._address = self._socket.getsockname()
259 except socket.error:
260 self._socket.close()
261 raise
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000262 self._family = family
263 self._last_accepted = None
264
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000265 if family == 'AF_UNIX':
266 self._unlink = Finalize(
Jesse Noller9949d6e2008-07-15 18:29:18 +0000267 self, os.unlink, args=(address,), exitpriority=0
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000268 )
269 else:
270 self._unlink = None
271
272 def accept(self):
273 s, self._last_accepted = self._socket.accept()
Richard Oudkerke4b99382012-07-27 14:05:46 +0100274 s.setblocking(True)
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000275 fd = duplicate(s.fileno())
276 conn = _multiprocessing.Connection(fd)
277 s.close()
278 return conn
279
280 def close(self):
281 self._socket.close()
282 if self._unlink is not None:
283 self._unlink()
284
285
286def SocketClient(address):
287 '''
288 Return a connection object connected to the socket given by `address`
289 '''
290 family = address_type(address)
291 s = socket.socket( getattr(socket, family) )
Richard Oudkerke4b99382012-07-27 14:05:46 +0100292 s.setblocking(True)
Antoine Pitrouc562ca42009-11-13 22:31:18 +0000293 t = _init_timeout()
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000294
295 while 1:
296 try:
297 s.connect(address)
298 except socket.error, e:
Antoine Pitrouc562ca42009-11-13 22:31:18 +0000299 if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000300 debug('failed to connect to address %s', address)
301 raise
302 time.sleep(0.01)
303 else:
304 break
305 else:
306 raise
307
308 fd = duplicate(s.fileno())
309 conn = _multiprocessing.Connection(fd)
310 s.close()
311 return conn
312
313#
314# Definitions for connections based on named pipes
315#
316
317if sys.platform == 'win32':
318
319 class PipeListener(object):
320 '''
321 Representation of a named pipe
322 '''
323 def __init__(self, address, backlog=None):
324 self._address = address
325 handle = win32.CreateNamedPipe(
326 address, win32.PIPE_ACCESS_DUPLEX,
327 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
328 win32.PIPE_WAIT,
329 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
330 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
331 )
332 self._handle_queue = [handle]
333 self._last_accepted = None
334
335 sub_debug('listener created with address=%r', self._address)
336
337 self.close = Finalize(
338 self, PipeListener._finalize_pipe_listener,
339 args=(self._handle_queue, self._address), exitpriority=0
340 )
341
342 def accept(self):
343 newhandle = win32.CreateNamedPipe(
344 self._address, win32.PIPE_ACCESS_DUPLEX,
345 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
346 win32.PIPE_WAIT,
347 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
348 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
349 )
350 self._handle_queue.append(newhandle)
351 handle = self._handle_queue.pop(0)
352 try:
353 win32.ConnectNamedPipe(handle, win32.NULL)
354 except WindowsError, e:
Richard Oudkerk9a16fa62012-05-05 20:41:08 +0100355 # ERROR_NO_DATA can occur if a client has already connected,
356 # written data and then disconnected -- see Issue 14725.
357 if e.args[0] not in (win32.ERROR_PIPE_CONNECTED,
358 win32.ERROR_NO_DATA):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000359 raise
360 return _multiprocessing.PipeConnection(handle)
361
362 @staticmethod
363 def _finalize_pipe_listener(queue, address):
364 sub_debug('closing listener with address=%r', address)
365 for handle in queue:
366 close(handle)
367
368 def PipeClient(address):
369 '''
370 Return a connection object connected to the pipe given by `address`
371 '''
Antoine Pitrouc562ca42009-11-13 22:31:18 +0000372 t = _init_timeout()
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000373 while 1:
374 try:
375 win32.WaitNamedPipe(address, 1000)
376 h = win32.CreateFile(
377 address, win32.GENERIC_READ | win32.GENERIC_WRITE,
378 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
379 )
380 except WindowsError, e:
381 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
Antoine Pitrouc562ca42009-11-13 22:31:18 +0000382 win32.ERROR_PIPE_BUSY) or _check_timeout(t):
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000383 raise
384 else:
385 break
386 else:
387 raise
388
389 win32.SetNamedPipeHandleState(
390 h, win32.PIPE_READMODE_MESSAGE, None, None
391 )
392 return _multiprocessing.PipeConnection(h)
393
394#
395# Authentication stuff
396#
397
398MESSAGE_LENGTH = 20
399
Benjamin Petersonb09c9392008-06-25 12:39:05 +0000400CHALLENGE = b'#CHALLENGE#'
401WELCOME = b'#WELCOME#'
402FAILURE = b'#FAILURE#'
Benjamin Peterson7f03ea72008-06-13 19:20:48 +0000403
404def deliver_challenge(connection, authkey):
405 import hmac
406 assert isinstance(authkey, bytes)
407 message = os.urandom(MESSAGE_LENGTH)
408 connection.send_bytes(CHALLENGE + message)
409 digest = hmac.new(authkey, message).digest()
410 response = connection.recv_bytes(256) # reject large message
411 if response == digest:
412 connection.send_bytes(WELCOME)
413 else:
414 connection.send_bytes(FAILURE)
415 raise AuthenticationError('digest received was wrong')
416
417def answer_challenge(connection, authkey):
418 import hmac
419 assert isinstance(authkey, bytes)
420 message = connection.recv_bytes(256) # reject large message
421 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
422 message = message[len(CHALLENGE):]
423 digest = hmac.new(authkey, message).digest()
424 connection.send_bytes(digest)
425 response = connection.recv_bytes(256) # reject large message
426 if response != WELCOME:
427 raise AuthenticationError('digest sent was rejected')
428
429#
430# Support for using xmlrpclib for serialization
431#
432
433class ConnectionWrapper(object):
434 def __init__(self, conn, dumps, loads):
435 self._conn = conn
436 self._dumps = dumps
437 self._loads = loads
438 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
439 obj = getattr(conn, attr)
440 setattr(self, attr, obj)
441 def send(self, obj):
442 s = self._dumps(obj)
443 self._conn.send_bytes(s)
444 def recv(self):
445 s = self._conn.recv_bytes()
446 return self._loads(s)
447
448def _xml_dumps(obj):
449 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
450
451def _xml_loads(s):
452 (obj,), method = xmlrpclib.loads(s.decode('utf8'))
453 return obj
454
455class XmlListener(Listener):
456 def accept(self):
457 global xmlrpclib
458 import xmlrpclib
459 obj = Listener.accept(self)
460 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
461
462def XmlClient(*args, **kwds):
463 global xmlrpclib
464 import xmlrpclib
465 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)