blob: f083c54d2a64818cb18adf0e1ff7e19937662ebc [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A higher level module for using sockets (or Windows named pipes)
3#
4# multiprocessing/connection.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35__all__ = [ 'Client', 'Listener', 'Pipe' ]
36
37import os
38import sys
39import socket
Georg Brandl6aa2d1f2008-08-12 08:35:52 +000040import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000041import time
42import tempfile
43import itertools
44
45import _multiprocessing
Neal Norwitz5d6415e2008-08-25 01:53:32 +000046from multiprocessing import current_process, AuthenticationError
Benjamin Petersone711caf2008-06-11 16:44:04 +000047from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
48from multiprocessing.forking import duplicate, close
49
50
51#
52#
53#
54
55BUFSIZE = 8192
Antoine Pitrou45d61a32009-11-13 22:35:18 +000056# A very generous timeout when it comes to local connections...
57CONNECTION_TIMEOUT = 20.
Benjamin Petersone711caf2008-06-11 16:44:04 +000058
59_mmap_counter = itertools.count()
60
61default_family = 'AF_INET'
62families = ['AF_INET']
63
64if hasattr(socket, 'AF_UNIX'):
65 default_family = 'AF_UNIX'
66 families += ['AF_UNIX']
67
68if sys.platform == 'win32':
69 default_family = 'AF_PIPE'
70 families += ['AF_PIPE']
71
Antoine Pitrou45d61a32009-11-13 22:35:18 +000072
73def _init_timeout(timeout=CONNECTION_TIMEOUT):
74 return time.time() + timeout
75
76def _check_timeout(t):
77 return time.time() > t
78
Benjamin Petersone711caf2008-06-11 16:44:04 +000079#
80#
81#
82
83def arbitrary_address(family):
84 '''
85 Return an arbitrary free address for the given family
86 '''
87 if family == 'AF_INET':
88 return ('localhost', 0)
89 elif family == 'AF_UNIX':
90 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
91 elif family == 'AF_PIPE':
92 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
93 (os.getpid(), next(_mmap_counter)))
94 else:
95 raise ValueError('unrecognized family')
96
Antoine Pitrou709176f2012-04-01 17:19:09 +020097def _validate_family(family):
98 '''
99 Checks if the family is valid for the current environment.
100 '''
101 if sys.platform != 'win32' and family == 'AF_PIPE':
102 raise ValueError('Family %s is not recognized.' % family)
103
Antoine Pitrou6d20cba2012-04-03 20:12:23 +0200104 if sys.platform == 'win32' and family == 'AF_UNIX':
105 # double check
106 if not hasattr(socket, family):
107 raise ValueError('Family %s is not recognized.' % family)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000108
109def address_type(address):
110 '''
111 Return the types of the address
112
113 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
114 '''
115 if type(address) == tuple:
116 return 'AF_INET'
117 elif type(address) is str and address.startswith('\\\\'):
118 return 'AF_PIPE'
119 elif type(address) is str:
120 return 'AF_UNIX'
121 else:
122 raise ValueError('address type of %r unrecognized' % address)
123
124#
125# Public functions
126#
127
128class Listener(object):
129 '''
130 Returns a listener object.
131
132 This is a wrapper for a bound socket which is 'listening' for
133 connections, or for a Windows named pipe.
134 '''
135 def __init__(self, address=None, family=None, backlog=1, authkey=None):
136 family = family or (address and address_type(address)) \
137 or default_family
138 address = address or arbitrary_address(family)
139
Antoine Pitrou709176f2012-04-01 17:19:09 +0200140 _validate_family(family)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000141 if family == 'AF_PIPE':
142 self._listener = PipeListener(address, backlog)
143 else:
144 self._listener = SocketListener(address, family, backlog)
145
146 if authkey is not None and not isinstance(authkey, bytes):
147 raise TypeError('authkey should be a byte string')
148
149 self._authkey = authkey
150
151 def accept(self):
152 '''
153 Accept a connection on the bound socket or named pipe of `self`.
154
155 Returns a `Connection` object.
156 '''
157 c = self._listener.accept()
158 if self._authkey:
159 deliver_challenge(c, self._authkey)
160 answer_challenge(c, self._authkey)
161 return c
162
163 def close(self):
164 '''
165 Close the bound socket or named pipe of `self`.
166 '''
167 return self._listener.close()
168
169 address = property(lambda self: self._listener._address)
170 last_accepted = property(lambda self: self._listener._last_accepted)
171
172
173def Client(address, family=None, authkey=None):
174 '''
175 Returns a connection to the address of a `Listener`
176 '''
177 family = family or address_type(address)
Antoine Pitrou709176f2012-04-01 17:19:09 +0200178 _validate_family(family)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179 if family == 'AF_PIPE':
180 c = PipeClient(address)
181 else:
182 c = SocketClient(address)
183
184 if authkey is not None and not isinstance(authkey, bytes):
185 raise TypeError('authkey should be a byte string')
186
187 if authkey is not None:
188 answer_challenge(c, authkey)
189 deliver_challenge(c, authkey)
190
191 return c
192
193
194if sys.platform != 'win32':
195
196 def Pipe(duplex=True):
197 '''
198 Returns pair of connection objects at either end of a pipe
199 '''
200 if duplex:
201 s1, s2 = socket.socketpair()
Richard Oudkerk4887b1c2012-07-27 14:06:11 +0100202 s1.setblocking(True)
203 s2.setblocking(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
205 c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
206 s1.close()
207 s2.close()
208 else:
209 fd1, fd2 = os.pipe()
210 c1 = _multiprocessing.Connection(fd1, writable=False)
211 c2 = _multiprocessing.Connection(fd2, readable=False)
212
213 return c1, c2
214
215else:
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +0100216 if hasattr(select, 'poll'):
217 def _poll(fds, timeout):
218 if timeout is not None:
219 timeout = int(timeout) * 1000 # timeout is in milliseconds
220 fd_map = {}
221 pollster = select.poll()
222 for fd in fds:
223 pollster.register(fd, select.POLLIN)
224 if hasattr(fd, 'fileno'):
225 fd_map[fd.fileno()] = fd
226 else:
227 fd_map[fd] = fd
228 ls = []
229 for fd, event in pollster.poll(timeout):
230 if event & select.POLLNVAL:
231 raise ValueError('invalid file descriptor %i' % fd)
232 ls.append(fd_map[fd])
233 return ls
234 else:
235 def _poll(fds, timeout):
236 return select.select(fds, [], [], timeout)[0]
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237
Brian Curtina6a32742010-08-04 15:47:24 +0000238 from _multiprocessing import win32
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239
240 def Pipe(duplex=True):
241 '''
242 Returns pair of connection objects at either end of a pipe
243 '''
244 address = arbitrary_address('AF_PIPE')
245 if duplex:
246 openmode = win32.PIPE_ACCESS_DUPLEX
247 access = win32.GENERIC_READ | win32.GENERIC_WRITE
248 obsize, ibsize = BUFSIZE, BUFSIZE
249 else:
250 openmode = win32.PIPE_ACCESS_INBOUND
251 access = win32.GENERIC_WRITE
252 obsize, ibsize = 0, BUFSIZE
253
254 h1 = win32.CreateNamedPipe(
255 address, openmode,
256 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
257 win32.PIPE_WAIT,
258 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
259 )
260 h2 = win32.CreateFile(
261 address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
262 )
263 win32.SetNamedPipeHandleState(
264 h2, win32.PIPE_READMODE_MESSAGE, None, None
265 )
266
267 try:
268 win32.ConnectNamedPipe(h1, win32.NULL)
269 except WindowsError as e:
270 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
271 raise
272
273 c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
274 c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
275
276 return c1, c2
277
278#
279# Definitions for connections based on sockets
280#
281
282class SocketListener(object):
283 '''
Georg Brandl734e2682008-08-12 08:18:18 +0000284 Representation of a socket which is bound to an address and listening
Benjamin Petersone711caf2008-06-11 16:44:04 +0000285 '''
286 def __init__(self, address, family, backlog=1):
287 self._socket = socket.socket(getattr(socket, family))
Charles-François Natali992ca522012-02-04 14:55:53 +0100288 try:
289 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Richard Oudkerk4887b1c2012-07-27 14:06:11 +0100290 self._socket.setblocking(True)
Charles-François Natali992ca522012-02-04 14:55:53 +0100291 self._socket.bind(address)
292 self._socket.listen(backlog)
293 self._address = self._socket.getsockname()
294 except socket.error:
295 self._socket.close()
296 raise
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 self._family = family
298 self._last_accepted = None
299
Benjamin Petersone711caf2008-06-11 16:44:04 +0000300 if family == 'AF_UNIX':
301 self._unlink = Finalize(
Georg Brandl2ee470f2008-07-16 12:55:28 +0000302 self, os.unlink, args=(address,), exitpriority=0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000303 )
304 else:
305 self._unlink = None
306
307 def accept(self):
308 s, self._last_accepted = self._socket.accept()
Richard Oudkerk4887b1c2012-07-27 14:06:11 +0100309 s.setblocking(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000310 fd = duplicate(s.fileno())
311 conn = _multiprocessing.Connection(fd)
312 s.close()
313 return conn
314
315 def close(self):
316 self._socket.close()
317 if self._unlink is not None:
318 self._unlink()
319
320
321def SocketClient(address):
322 '''
323 Return a connection object connected to the socket given by `address`
324 '''
325 family = address_type(address)
Victor Stinner2b695062011-01-03 15:47:59 +0000326 with socket.socket( getattr(socket, family) ) as s:
Richard Oudkerk4887b1c2012-07-27 14:06:11 +0100327 s.setblocking(True)
Victor Stinner2b695062011-01-03 15:47:59 +0000328 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000329
Victor Stinner2b695062011-01-03 15:47:59 +0000330 while 1:
331 try:
332 s.connect(address)
333 except socket.error as e:
334 if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
335 debug('failed to connect to address %s', address)
336 raise
337 time.sleep(0.01)
338 else:
339 break
Benjamin Petersone711caf2008-06-11 16:44:04 +0000340 else:
Victor Stinner2b695062011-01-03 15:47:59 +0000341 raise
Benjamin Petersone711caf2008-06-11 16:44:04 +0000342
Victor Stinner2b695062011-01-03 15:47:59 +0000343 fd = duplicate(s.fileno())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000344 conn = _multiprocessing.Connection(fd)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000345 return conn
346
347#
348# Definitions for connections based on named pipes
349#
350
351if sys.platform == 'win32':
352
353 class PipeListener(object):
354 '''
355 Representation of a named pipe
356 '''
357 def __init__(self, address, backlog=None):
358 self._address = address
359 handle = win32.CreateNamedPipe(
360 address, win32.PIPE_ACCESS_DUPLEX,
361 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
362 win32.PIPE_WAIT,
363 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
364 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
365 )
366 self._handle_queue = [handle]
367 self._last_accepted = None
368
369 sub_debug('listener created with address=%r', self._address)
370
371 self.close = Finalize(
372 self, PipeListener._finalize_pipe_listener,
373 args=(self._handle_queue, self._address), exitpriority=0
374 )
375
376 def accept(self):
377 newhandle = win32.CreateNamedPipe(
378 self._address, win32.PIPE_ACCESS_DUPLEX,
379 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
380 win32.PIPE_WAIT,
381 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
382 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
383 )
384 self._handle_queue.append(newhandle)
385 handle = self._handle_queue.pop(0)
386 try:
387 win32.ConnectNamedPipe(handle, win32.NULL)
388 except WindowsError as e:
Richard Oudkerk7ef909c2012-05-05 20:41:23 +0100389 # ERROR_NO_DATA can occur if a client has already connected,
390 # written data and then disconnected -- see Issue 14725.
391 if e.args[0] not in (win32.ERROR_PIPE_CONNECTED,
392 win32.ERROR_NO_DATA):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000393 raise
394 return _multiprocessing.PipeConnection(handle)
395
396 @staticmethod
397 def _finalize_pipe_listener(queue, address):
398 sub_debug('closing listener with address=%r', address)
399 for handle in queue:
400 close(handle)
401
402 def PipeClient(address):
403 '''
404 Return a connection object connected to the pipe given by `address`
405 '''
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000406 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000407 while 1:
408 try:
409 win32.WaitNamedPipe(address, 1000)
410 h = win32.CreateFile(
411 address, win32.GENERIC_READ | win32.GENERIC_WRITE,
412 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
413 )
414 except WindowsError as e:
415 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000416 win32.ERROR_PIPE_BUSY) or _check_timeout(t):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000417 raise
418 else:
419 break
420 else:
421 raise
422
423 win32.SetNamedPipeHandleState(
424 h, win32.PIPE_READMODE_MESSAGE, None, None
425 )
426 return _multiprocessing.PipeConnection(h)
427
428#
429# Authentication stuff
430#
431
432MESSAGE_LENGTH = 20
433
Benjamin Peterson1fcfe212008-06-25 12:54:22 +0000434CHALLENGE = b'#CHALLENGE#'
435WELCOME = b'#WELCOME#'
436FAILURE = b'#FAILURE#'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000437
438def deliver_challenge(connection, authkey):
439 import hmac
440 assert isinstance(authkey, bytes)
441 message = os.urandom(MESSAGE_LENGTH)
442 connection.send_bytes(CHALLENGE + message)
443 digest = hmac.new(authkey, message).digest()
444 response = connection.recv_bytes(256) # reject large message
445 if response == digest:
446 connection.send_bytes(WELCOME)
447 else:
448 connection.send_bytes(FAILURE)
449 raise AuthenticationError('digest received was wrong')
450
451def answer_challenge(connection, authkey):
452 import hmac
453 assert isinstance(authkey, bytes)
454 message = connection.recv_bytes(256) # reject large message
455 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
456 message = message[len(CHALLENGE):]
457 digest = hmac.new(authkey, message).digest()
458 connection.send_bytes(digest)
459 response = connection.recv_bytes(256) # reject large message
460 if response != WELCOME:
461 raise AuthenticationError('digest sent was rejected')
462
463#
464# Support for using xmlrpclib for serialization
465#
466
467class ConnectionWrapper(object):
468 def __init__(self, conn, dumps, loads):
469 self._conn = conn
470 self._dumps = dumps
471 self._loads = loads
472 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
473 obj = getattr(conn, attr)
474 setattr(self, attr, obj)
475 def send(self, obj):
476 s = self._dumps(obj)
477 self._conn.send_bytes(s)
478 def recv(self):
479 s = self._conn.recv_bytes()
480 return self._loads(s)
481
482def _xml_dumps(obj):
483 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
484
485def _xml_loads(s):
486 (obj,), method = xmlrpclib.loads(s.decode('utf8'))
487 return obj
488
489class XmlListener(Listener):
490 def accept(self):
491 global xmlrpclib
492 import xmlrpc.client as xmlrpclib
493 obj = Listener.accept(self)
494 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
495
496def XmlClient(*args, **kwds):
497 global xmlrpclib
498 import xmlrpc.client as xmlrpclib
499 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)