blob: f73faf66b295fb297a96196dc0f05bfd451af84a [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A higher level module for using sockets (or Windows named pipes)
3#
4# multiprocessing/connection.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35__all__ = [ 'Client', 'Listener', 'Pipe' ]
36
37import os
38import sys
39import socket
Georg Brandl6aa2d1f2008-08-12 08:35:52 +000040import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000041import time
42import tempfile
43import itertools
Giampaolo Rodola'549d4652013-01-03 02:54:27 +010044import select
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
46import _multiprocessing
Neal Norwitz5d6415e2008-08-25 01:53:32 +000047from multiprocessing import current_process, AuthenticationError
Benjamin Petersone711caf2008-06-11 16:44:04 +000048from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
49from multiprocessing.forking import duplicate, close
50
51
52#
53#
54#
55
56BUFSIZE = 8192
Antoine Pitrou45d61a32009-11-13 22:35:18 +000057# A very generous timeout when it comes to local connections...
58CONNECTION_TIMEOUT = 20.
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60_mmap_counter = itertools.count()
61
62default_family = 'AF_INET'
63families = ['AF_INET']
64
65if hasattr(socket, 'AF_UNIX'):
66 default_family = 'AF_UNIX'
67 families += ['AF_UNIX']
68
69if sys.platform == 'win32':
70 default_family = 'AF_PIPE'
71 families += ['AF_PIPE']
72
Antoine Pitrou45d61a32009-11-13 22:35:18 +000073
74def _init_timeout(timeout=CONNECTION_TIMEOUT):
75 return time.time() + timeout
76
77def _check_timeout(t):
78 return time.time() > t
79
Benjamin Petersone711caf2008-06-11 16:44:04 +000080#
81#
82#
83
84def arbitrary_address(family):
85 '''
86 Return an arbitrary free address for the given family
87 '''
88 if family == 'AF_INET':
89 return ('localhost', 0)
90 elif family == 'AF_UNIX':
91 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
92 elif family == 'AF_PIPE':
93 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
94 (os.getpid(), next(_mmap_counter)))
95 else:
96 raise ValueError('unrecognized family')
97
Antoine Pitrou709176f2012-04-01 17:19:09 +020098def _validate_family(family):
99 '''
100 Checks if the family is valid for the current environment.
101 '''
102 if sys.platform != 'win32' and family == 'AF_PIPE':
103 raise ValueError('Family %s is not recognized.' % family)
104
Antoine Pitrou6d20cba2012-04-03 20:12:23 +0200105 if sys.platform == 'win32' and family == 'AF_UNIX':
106 # double check
107 if not hasattr(socket, family):
108 raise ValueError('Family %s is not recognized.' % family)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000109
110def address_type(address):
111 '''
112 Return the types of the address
113
114 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
115 '''
116 if type(address) == tuple:
117 return 'AF_INET'
118 elif type(address) is str and address.startswith('\\\\'):
119 return 'AF_PIPE'
120 elif type(address) is str:
121 return 'AF_UNIX'
122 else:
123 raise ValueError('address type of %r unrecognized' % address)
124
125#
126# Public functions
127#
128
129class Listener(object):
130 '''
131 Returns a listener object.
132
133 This is a wrapper for a bound socket which is 'listening' for
134 connections, or for a Windows named pipe.
135 '''
136 def __init__(self, address=None, family=None, backlog=1, authkey=None):
137 family = family or (address and address_type(address)) \
138 or default_family
139 address = address or arbitrary_address(family)
140
Antoine Pitrou709176f2012-04-01 17:19:09 +0200141 _validate_family(family)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000142 if family == 'AF_PIPE':
143 self._listener = PipeListener(address, backlog)
144 else:
145 self._listener = SocketListener(address, family, backlog)
146
147 if authkey is not None and not isinstance(authkey, bytes):
148 raise TypeError('authkey should be a byte string')
149
150 self._authkey = authkey
151
152 def accept(self):
153 '''
154 Accept a connection on the bound socket or named pipe of `self`.
155
156 Returns a `Connection` object.
157 '''
158 c = self._listener.accept()
159 if self._authkey:
160 deliver_challenge(c, self._authkey)
161 answer_challenge(c, self._authkey)
162 return c
163
164 def close(self):
165 '''
166 Close the bound socket or named pipe of `self`.
167 '''
168 return self._listener.close()
169
170 address = property(lambda self: self._listener._address)
171 last_accepted = property(lambda self: self._listener._last_accepted)
172
173
174def Client(address, family=None, authkey=None):
175 '''
176 Returns a connection to the address of a `Listener`
177 '''
178 family = family or address_type(address)
Antoine Pitrou709176f2012-04-01 17:19:09 +0200179 _validate_family(family)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 if family == 'AF_PIPE':
181 c = PipeClient(address)
182 else:
183 c = SocketClient(address)
184
185 if authkey is not None and not isinstance(authkey, bytes):
186 raise TypeError('authkey should be a byte string')
187
188 if authkey is not None:
189 answer_challenge(c, authkey)
190 deliver_challenge(c, authkey)
191
192 return c
193
194
195if sys.platform != 'win32':
196
197 def Pipe(duplex=True):
198 '''
199 Returns pair of connection objects at either end of a pipe
200 '''
201 if duplex:
202 s1, s2 = socket.socketpair()
Richard Oudkerk4887b1c2012-07-27 14:06:11 +0100203 s1.setblocking(True)
204 s2.setblocking(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205 c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
206 c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
207 s1.close()
208 s2.close()
209 else:
210 fd1, fd2 = os.pipe()
211 c1 = _multiprocessing.Connection(fd1, writable=False)
212 c2 = _multiprocessing.Connection(fd2, readable=False)
213
214 return c1, c2
215
216else:
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +0100217 if hasattr(select, 'poll'):
218 def _poll(fds, timeout):
219 if timeout is not None:
220 timeout = int(timeout) * 1000 # timeout is in milliseconds
221 fd_map = {}
222 pollster = select.poll()
223 for fd in fds:
224 pollster.register(fd, select.POLLIN)
225 if hasattr(fd, 'fileno'):
226 fd_map[fd.fileno()] = fd
227 else:
228 fd_map[fd] = fd
229 ls = []
230 for fd, event in pollster.poll(timeout):
231 if event & select.POLLNVAL:
232 raise ValueError('invalid file descriptor %i' % fd)
233 ls.append(fd_map[fd])
234 return ls
235 else:
236 def _poll(fds, timeout):
237 return select.select(fds, [], [], timeout)[0]
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238
Brian Curtina6a32742010-08-04 15:47:24 +0000239 from _multiprocessing import win32
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240
241 def Pipe(duplex=True):
242 '''
243 Returns pair of connection objects at either end of a pipe
244 '''
245 address = arbitrary_address('AF_PIPE')
246 if duplex:
247 openmode = win32.PIPE_ACCESS_DUPLEX
248 access = win32.GENERIC_READ | win32.GENERIC_WRITE
249 obsize, ibsize = BUFSIZE, BUFSIZE
250 else:
251 openmode = win32.PIPE_ACCESS_INBOUND
252 access = win32.GENERIC_WRITE
253 obsize, ibsize = 0, BUFSIZE
254
255 h1 = win32.CreateNamedPipe(
256 address, openmode,
257 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
258 win32.PIPE_WAIT,
259 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
260 )
261 h2 = win32.CreateFile(
262 address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
263 )
264 win32.SetNamedPipeHandleState(
265 h2, win32.PIPE_READMODE_MESSAGE, None, None
266 )
267
268 try:
269 win32.ConnectNamedPipe(h1, win32.NULL)
270 except WindowsError as e:
271 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
272 raise
273
274 c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
275 c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
276
277 return c1, c2
278
279#
280# Definitions for connections based on sockets
281#
282
283class SocketListener(object):
284 '''
Georg Brandl734e2682008-08-12 08:18:18 +0000285 Representation of a socket which is bound to an address and listening
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286 '''
287 def __init__(self, address, family, backlog=1):
288 self._socket = socket.socket(getattr(socket, family))
Charles-François Natali992ca522012-02-04 14:55:53 +0100289 try:
290 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Richard Oudkerk4887b1c2012-07-27 14:06:11 +0100291 self._socket.setblocking(True)
Charles-François Natali992ca522012-02-04 14:55:53 +0100292 self._socket.bind(address)
293 self._socket.listen(backlog)
294 self._address = self._socket.getsockname()
295 except socket.error:
296 self._socket.close()
297 raise
Benjamin Petersone711caf2008-06-11 16:44:04 +0000298 self._family = family
299 self._last_accepted = None
300
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 if family == 'AF_UNIX':
302 self._unlink = Finalize(
Georg Brandl2ee470f2008-07-16 12:55:28 +0000303 self, os.unlink, args=(address,), exitpriority=0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304 )
305 else:
306 self._unlink = None
307
308 def accept(self):
309 s, self._last_accepted = self._socket.accept()
Richard Oudkerk4887b1c2012-07-27 14:06:11 +0100310 s.setblocking(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000311 fd = duplicate(s.fileno())
312 conn = _multiprocessing.Connection(fd)
313 s.close()
314 return conn
315
316 def close(self):
317 self._socket.close()
318 if self._unlink is not None:
319 self._unlink()
320
321
322def SocketClient(address):
323 '''
324 Return a connection object connected to the socket given by `address`
325 '''
326 family = address_type(address)
Victor Stinner2b695062011-01-03 15:47:59 +0000327 with socket.socket( getattr(socket, family) ) as s:
Richard Oudkerk4887b1c2012-07-27 14:06:11 +0100328 s.setblocking(True)
Victor Stinner2b695062011-01-03 15:47:59 +0000329 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330
Victor Stinner2b695062011-01-03 15:47:59 +0000331 while 1:
332 try:
333 s.connect(address)
334 except socket.error as e:
335 if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
336 debug('failed to connect to address %s', address)
337 raise
338 time.sleep(0.01)
339 else:
340 break
Benjamin Petersone711caf2008-06-11 16:44:04 +0000341 else:
Victor Stinner2b695062011-01-03 15:47:59 +0000342 raise
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343
Victor Stinner2b695062011-01-03 15:47:59 +0000344 fd = duplicate(s.fileno())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000345 conn = _multiprocessing.Connection(fd)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000346 return conn
347
348#
349# Definitions for connections based on named pipes
350#
351
352if sys.platform == 'win32':
353
354 class PipeListener(object):
355 '''
356 Representation of a named pipe
357 '''
358 def __init__(self, address, backlog=None):
359 self._address = address
360 handle = win32.CreateNamedPipe(
361 address, win32.PIPE_ACCESS_DUPLEX,
362 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
363 win32.PIPE_WAIT,
364 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
365 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
366 )
367 self._handle_queue = [handle]
368 self._last_accepted = None
369
370 sub_debug('listener created with address=%r', self._address)
371
372 self.close = Finalize(
373 self, PipeListener._finalize_pipe_listener,
374 args=(self._handle_queue, self._address), exitpriority=0
375 )
376
377 def accept(self):
378 newhandle = win32.CreateNamedPipe(
379 self._address, win32.PIPE_ACCESS_DUPLEX,
380 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
381 win32.PIPE_WAIT,
382 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
383 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
384 )
385 self._handle_queue.append(newhandle)
386 handle = self._handle_queue.pop(0)
387 try:
388 win32.ConnectNamedPipe(handle, win32.NULL)
389 except WindowsError as e:
Richard Oudkerk7ef909c2012-05-05 20:41:23 +0100390 # ERROR_NO_DATA can occur if a client has already connected,
391 # written data and then disconnected -- see Issue 14725.
392 if e.args[0] not in (win32.ERROR_PIPE_CONNECTED,
393 win32.ERROR_NO_DATA):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000394 raise
395 return _multiprocessing.PipeConnection(handle)
396
397 @staticmethod
398 def _finalize_pipe_listener(queue, address):
399 sub_debug('closing listener with address=%r', address)
400 for handle in queue:
401 close(handle)
402
403 def PipeClient(address):
404 '''
405 Return a connection object connected to the pipe given by `address`
406 '''
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000407 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000408 while 1:
409 try:
410 win32.WaitNamedPipe(address, 1000)
411 h = win32.CreateFile(
412 address, win32.GENERIC_READ | win32.GENERIC_WRITE,
413 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
414 )
415 except WindowsError as e:
416 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000417 win32.ERROR_PIPE_BUSY) or _check_timeout(t):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000418 raise
419 else:
420 break
421 else:
422 raise
423
424 win32.SetNamedPipeHandleState(
425 h, win32.PIPE_READMODE_MESSAGE, None, None
426 )
427 return _multiprocessing.PipeConnection(h)
428
429#
430# Authentication stuff
431#
432
433MESSAGE_LENGTH = 20
434
Benjamin Peterson1fcfe212008-06-25 12:54:22 +0000435CHALLENGE = b'#CHALLENGE#'
436WELCOME = b'#WELCOME#'
437FAILURE = b'#FAILURE#'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000438
439def deliver_challenge(connection, authkey):
440 import hmac
441 assert isinstance(authkey, bytes)
442 message = os.urandom(MESSAGE_LENGTH)
443 connection.send_bytes(CHALLENGE + message)
444 digest = hmac.new(authkey, message).digest()
445 response = connection.recv_bytes(256) # reject large message
446 if response == digest:
447 connection.send_bytes(WELCOME)
448 else:
449 connection.send_bytes(FAILURE)
450 raise AuthenticationError('digest received was wrong')
451
452def answer_challenge(connection, authkey):
453 import hmac
454 assert isinstance(authkey, bytes)
455 message = connection.recv_bytes(256) # reject large message
456 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
457 message = message[len(CHALLENGE):]
458 digest = hmac.new(authkey, message).digest()
459 connection.send_bytes(digest)
460 response = connection.recv_bytes(256) # reject large message
461 if response != WELCOME:
462 raise AuthenticationError('digest sent was rejected')
463
464#
465# Support for using xmlrpclib for serialization
466#
467
468class ConnectionWrapper(object):
469 def __init__(self, conn, dumps, loads):
470 self._conn = conn
471 self._dumps = dumps
472 self._loads = loads
473 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
474 obj = getattr(conn, attr)
475 setattr(self, attr, obj)
476 def send(self, obj):
477 s = self._dumps(obj)
478 self._conn.send_bytes(s)
479 def recv(self):
480 s = self._conn.recv_bytes()
481 return self._loads(s)
482
483def _xml_dumps(obj):
484 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
485
486def _xml_loads(s):
487 (obj,), method = xmlrpclib.loads(s.decode('utf8'))
488 return obj
489
490class XmlListener(Listener):
491 def accept(self):
492 global xmlrpclib
493 import xmlrpc.client as xmlrpclib
494 obj = Listener.accept(self)
495 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
496
497def XmlClient(*args, **kwds):
498 global xmlrpclib
499 import xmlrpc.client as xmlrpclib
500 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)