blob: b6f5bde6e6d14ee3cb2cc53517c0a5fcf0c0d6fe [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A higher level module for using sockets (or Windows named pipes)
3#
4# multiprocessing/connection.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9__all__ = [ 'Client', 'Listener', 'Pipe' ]
10
11import os
12import sys
13import socket
14import time
15import tempfile
16import itertools
17
18import _multiprocessing
19from multiprocessing import current_process
20from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
21from multiprocessing.forking import duplicate, close
22
23
24#
25#
26#
27
28BUFSIZE = 8192
29
30_mmap_counter = itertools.count()
31
32default_family = 'AF_INET'
33families = ['AF_INET']
34
35if hasattr(socket, 'AF_UNIX'):
36 default_family = 'AF_UNIX'
37 families += ['AF_UNIX']
38
39if sys.platform == 'win32':
40 default_family = 'AF_PIPE'
41 families += ['AF_PIPE']
42
43#
44#
45#
46
47def arbitrary_address(family):
48 '''
49 Return an arbitrary free address for the given family
50 '''
51 if family == 'AF_INET':
52 return ('localhost', 0)
53 elif family == 'AF_UNIX':
54 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
55 elif family == 'AF_PIPE':
56 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
57 (os.getpid(), next(_mmap_counter)))
58 else:
59 raise ValueError('unrecognized family')
60
61
62def address_type(address):
63 '''
64 Return the types of the address
65
66 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
67 '''
68 if type(address) == tuple:
69 return 'AF_INET'
70 elif type(address) is str and address.startswith('\\\\'):
71 return 'AF_PIPE'
72 elif type(address) is str:
73 return 'AF_UNIX'
74 else:
75 raise ValueError('address type of %r unrecognized' % address)
76
77#
78# Public functions
79#
80
81class Listener(object):
82 '''
83 Returns a listener object.
84
85 This is a wrapper for a bound socket which is 'listening' for
86 connections, or for a Windows named pipe.
87 '''
88 def __init__(self, address=None, family=None, backlog=1, authkey=None):
89 family = family or (address and address_type(address)) \
90 or default_family
91 address = address or arbitrary_address(family)
92
93 if family == 'AF_PIPE':
94 self._listener = PipeListener(address, backlog)
95 else:
96 self._listener = SocketListener(address, family, backlog)
97
98 if authkey is not None and not isinstance(authkey, bytes):
99 raise TypeError('authkey should be a byte string')
100
101 self._authkey = authkey
102
103 def accept(self):
104 '''
105 Accept a connection on the bound socket or named pipe of `self`.
106
107 Returns a `Connection` object.
108 '''
109 c = self._listener.accept()
110 if self._authkey:
111 deliver_challenge(c, self._authkey)
112 answer_challenge(c, self._authkey)
113 return c
114
115 def close(self):
116 '''
117 Close the bound socket or named pipe of `self`.
118 '''
119 return self._listener.close()
120
121 address = property(lambda self: self._listener._address)
122 last_accepted = property(lambda self: self._listener._last_accepted)
123
124
125def Client(address, family=None, authkey=None):
126 '''
127 Returns a connection to the address of a `Listener`
128 '''
129 family = family or address_type(address)
130 if family == 'AF_PIPE':
131 c = PipeClient(address)
132 else:
133 c = SocketClient(address)
134
135 if authkey is not None and not isinstance(authkey, bytes):
136 raise TypeError('authkey should be a byte string')
137
138 if authkey is not None:
139 answer_challenge(c, authkey)
140 deliver_challenge(c, authkey)
141
142 return c
143
144
145if sys.platform != 'win32':
146
147 def Pipe(duplex=True):
148 '''
149 Returns pair of connection objects at either end of a pipe
150 '''
151 if duplex:
152 s1, s2 = socket.socketpair()
153 c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
154 c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
155 s1.close()
156 s2.close()
157 else:
158 fd1, fd2 = os.pipe()
159 c1 = _multiprocessing.Connection(fd1, writable=False)
160 c2 = _multiprocessing.Connection(fd2, readable=False)
161
162 return c1, c2
163
164else:
165
166 from ._multiprocessing import win32
167
168 def Pipe(duplex=True):
169 '''
170 Returns pair of connection objects at either end of a pipe
171 '''
172 address = arbitrary_address('AF_PIPE')
173 if duplex:
174 openmode = win32.PIPE_ACCESS_DUPLEX
175 access = win32.GENERIC_READ | win32.GENERIC_WRITE
176 obsize, ibsize = BUFSIZE, BUFSIZE
177 else:
178 openmode = win32.PIPE_ACCESS_INBOUND
179 access = win32.GENERIC_WRITE
180 obsize, ibsize = 0, BUFSIZE
181
182 h1 = win32.CreateNamedPipe(
183 address, openmode,
184 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
185 win32.PIPE_WAIT,
186 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
187 )
188 h2 = win32.CreateFile(
189 address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
190 )
191 win32.SetNamedPipeHandleState(
192 h2, win32.PIPE_READMODE_MESSAGE, None, None
193 )
194
195 try:
196 win32.ConnectNamedPipe(h1, win32.NULL)
197 except WindowsError as e:
198 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
199 raise
200
201 c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
202 c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
203
204 return c1, c2
205
206#
207# Definitions for connections based on sockets
208#
209
210class SocketListener(object):
211 '''
Georg Brandl734e2682008-08-12 08:18:18 +0000212 Representation of a socket which is bound to an address and listening
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213 '''
214 def __init__(self, address, family, backlog=1):
215 self._socket = socket.socket(getattr(socket, family))
216 self._socket.bind(address)
217 self._socket.listen(backlog)
218 address = self._socket.getsockname()
219 if type(address) is tuple:
220 address = (socket.getfqdn(address[0]),) + address[1:]
221 self._address = address
222 self._family = family
223 self._last_accepted = None
224
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 if family == 'AF_UNIX':
226 self._unlink = Finalize(
Georg Brandl2ee470f2008-07-16 12:55:28 +0000227 self, os.unlink, args=(address,), exitpriority=0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228 )
229 else:
230 self._unlink = None
231
232 def accept(self):
233 s, self._last_accepted = self._socket.accept()
234 fd = duplicate(s.fileno())
235 conn = _multiprocessing.Connection(fd)
236 s.close()
237 return conn
238
239 def close(self):
240 self._socket.close()
241 if self._unlink is not None:
242 self._unlink()
243
244
245def SocketClient(address):
246 '''
247 Return a connection object connected to the socket given by `address`
248 '''
249 family = address_type(address)
250 s = socket.socket( getattr(socket, family) )
251
252 while 1:
253 try:
254 s.connect(address)
255 except socket.error as e:
256 if e.args[0] != 10061: # 10061 => connection refused
257 debug('failed to connect to address %s', address)
258 raise
259 time.sleep(0.01)
260 else:
261 break
262 else:
263 raise
264
265 fd = duplicate(s.fileno())
266 conn = _multiprocessing.Connection(fd)
267 s.close()
268 return conn
269
270#
271# Definitions for connections based on named pipes
272#
273
274if sys.platform == 'win32':
275
276 class PipeListener(object):
277 '''
278 Representation of a named pipe
279 '''
280 def __init__(self, address, backlog=None):
281 self._address = address
282 handle = win32.CreateNamedPipe(
283 address, win32.PIPE_ACCESS_DUPLEX,
284 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
285 win32.PIPE_WAIT,
286 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
287 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
288 )
289 self._handle_queue = [handle]
290 self._last_accepted = None
291
292 sub_debug('listener created with address=%r', self._address)
293
294 self.close = Finalize(
295 self, PipeListener._finalize_pipe_listener,
296 args=(self._handle_queue, self._address), exitpriority=0
297 )
298
299 def accept(self):
300 newhandle = win32.CreateNamedPipe(
301 self._address, win32.PIPE_ACCESS_DUPLEX,
302 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
303 win32.PIPE_WAIT,
304 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
305 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
306 )
307 self._handle_queue.append(newhandle)
308 handle = self._handle_queue.pop(0)
309 try:
310 win32.ConnectNamedPipe(handle, win32.NULL)
311 except WindowsError as e:
312 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
313 raise
314 return _multiprocessing.PipeConnection(handle)
315
316 @staticmethod
317 def _finalize_pipe_listener(queue, address):
318 sub_debug('closing listener with address=%r', address)
319 for handle in queue:
320 close(handle)
321
322 def PipeClient(address):
323 '''
324 Return a connection object connected to the pipe given by `address`
325 '''
326 while 1:
327 try:
328 win32.WaitNamedPipe(address, 1000)
329 h = win32.CreateFile(
330 address, win32.GENERIC_READ | win32.GENERIC_WRITE,
331 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
332 )
333 except WindowsError as e:
334 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
335 win32.ERROR_PIPE_BUSY):
336 raise
337 else:
338 break
339 else:
340 raise
341
342 win32.SetNamedPipeHandleState(
343 h, win32.PIPE_READMODE_MESSAGE, None, None
344 )
345 return _multiprocessing.PipeConnection(h)
346
347#
348# Authentication stuff
349#
350
351MESSAGE_LENGTH = 20
352
Benjamin Peterson1fcfe212008-06-25 12:54:22 +0000353CHALLENGE = b'#CHALLENGE#'
354WELCOME = b'#WELCOME#'
355FAILURE = b'#FAILURE#'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000356
357def deliver_challenge(connection, authkey):
358 import hmac
359 assert isinstance(authkey, bytes)
360 message = os.urandom(MESSAGE_LENGTH)
361 connection.send_bytes(CHALLENGE + message)
362 digest = hmac.new(authkey, message).digest()
363 response = connection.recv_bytes(256) # reject large message
364 if response == digest:
365 connection.send_bytes(WELCOME)
366 else:
367 connection.send_bytes(FAILURE)
368 raise AuthenticationError('digest received was wrong')
369
370def answer_challenge(connection, authkey):
371 import hmac
372 assert isinstance(authkey, bytes)
373 message = connection.recv_bytes(256) # reject large message
374 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
375 message = message[len(CHALLENGE):]
376 digest = hmac.new(authkey, message).digest()
377 connection.send_bytes(digest)
378 response = connection.recv_bytes(256) # reject large message
379 if response != WELCOME:
380 raise AuthenticationError('digest sent was rejected')
381
382#
383# Support for using xmlrpclib for serialization
384#
385
386class ConnectionWrapper(object):
387 def __init__(self, conn, dumps, loads):
388 self._conn = conn
389 self._dumps = dumps
390 self._loads = loads
391 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
392 obj = getattr(conn, attr)
393 setattr(self, attr, obj)
394 def send(self, obj):
395 s = self._dumps(obj)
396 self._conn.send_bytes(s)
397 def recv(self):
398 s = self._conn.recv_bytes()
399 return self._loads(s)
400
401def _xml_dumps(obj):
402 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
403
404def _xml_loads(s):
405 (obj,), method = xmlrpclib.loads(s.decode('utf8'))
406 return obj
407
408class XmlListener(Listener):
409 def accept(self):
410 global xmlrpclib
411 import xmlrpc.client as xmlrpclib
412 obj = Listener.accept(self)
413 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
414
415def XmlClient(*args, **kwds):
416 global xmlrpclib
417 import xmlrpc.client as xmlrpclib
418 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)