blob: ede2908eec78c15a854eb60a0845957e71062410 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A higher level module for using sockets (or Windows named pipes)
3#
4# multiprocessing/connection.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35__all__ = [ 'Client', 'Listener', 'Pipe' ]
36
Antoine Pitrou87cf2202011-05-09 17:04:27 +020037import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import os
39import sys
Antoine Pitrou87cf2202011-05-09 17:04:27 +020040import pickle
41import select
Benjamin Petersone711caf2008-06-11 16:44:04 +000042import socket
Antoine Pitrou87cf2202011-05-09 17:04:27 +020043import struct
Georg Brandl6aa2d1f2008-08-12 08:35:52 +000044import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000045import time
46import tempfile
47import itertools
48
49import _multiprocessing
Antoine Pitrou87cf2202011-05-09 17:04:27 +020050from multiprocessing import current_process, AuthenticationError, BufferTooShort
Antoine Pitroudd696492011-06-08 17:21:55 +020051from multiprocessing.util import (
52 get_temp_dir, Finalize, sub_debug, debug, _eintr_retry)
Antoine Pitrou87cf2202011-05-09 17:04:27 +020053try:
54 from _multiprocessing import win32
Antoine Pitroudd696492011-06-08 17:21:55 +020055 from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
Antoine Pitrou87cf2202011-05-09 17:04:27 +020056except ImportError:
57 if sys.platform == 'win32':
58 raise
59 win32 = None
Benjamin Petersone711caf2008-06-11 16:44:04 +000060
Antoine Pitroudd696492011-06-08 17:21:55 +020061_select = _eintr_retry(select.select)
62
Benjamin Petersone711caf2008-06-11 16:44:04 +000063#
64#
65#
66
67BUFSIZE = 8192
Antoine Pitrou45d61a32009-11-13 22:35:18 +000068# A very generous timeout when it comes to local connections...
69CONNECTION_TIMEOUT = 20.
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71_mmap_counter = itertools.count()
72
73default_family = 'AF_INET'
74families = ['AF_INET']
75
76if hasattr(socket, 'AF_UNIX'):
77 default_family = 'AF_UNIX'
78 families += ['AF_UNIX']
79
80if sys.platform == 'win32':
81 default_family = 'AF_PIPE'
82 families += ['AF_PIPE']
83
Antoine Pitrou45d61a32009-11-13 22:35:18 +000084
85def _init_timeout(timeout=CONNECTION_TIMEOUT):
86 return time.time() + timeout
87
88def _check_timeout(t):
89 return time.time() > t
90
Benjamin Petersone711caf2008-06-11 16:44:04 +000091#
92#
93#
94
95def arbitrary_address(family):
96 '''
97 Return an arbitrary free address for the given family
98 '''
99 if family == 'AF_INET':
100 return ('localhost', 0)
101 elif family == 'AF_UNIX':
102 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
103 elif family == 'AF_PIPE':
104 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
105 (os.getpid(), next(_mmap_counter)))
106 else:
107 raise ValueError('unrecognized family')
108
109
110def address_type(address):
111 '''
112 Return the types of the address
113
114 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
115 '''
116 if type(address) == tuple:
117 return 'AF_INET'
118 elif type(address) is str and address.startswith('\\\\'):
119 return 'AF_PIPE'
120 elif type(address) is str:
121 return 'AF_UNIX'
122 else:
123 raise ValueError('address type of %r unrecognized' % address)
124
Antoine Pitroudd696492011-06-08 17:21:55 +0200125
126class SentinelReady(Exception):
127 """
128 Raised when a sentinel is ready when polling.
129 """
130 def __init__(self, *args):
131 Exception.__init__(self, *args)
132 self.sentinels = args[0]
133
Benjamin Petersone711caf2008-06-11 16:44:04 +0000134#
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200135# Connection classes
136#
137
138class _ConnectionBase:
139 _handle = None
140
141 def __init__(self, handle, readable=True, writable=True):
142 handle = handle.__index__()
143 if handle < 0:
144 raise ValueError("invalid handle")
145 if not readable and not writable:
146 raise ValueError(
147 "at least one of `readable` and `writable` must be True")
148 self._handle = handle
149 self._readable = readable
150 self._writable = writable
151
152 def __del__(self):
153 if self._handle is not None:
154 self._close()
155
156 def _check_closed(self):
157 if self._handle is None:
158 raise IOError("handle is closed")
159
160 def _check_readable(self):
161 if not self._readable:
162 raise IOError("connection is write-only")
163
164 def _check_writable(self):
165 if not self._writable:
166 raise IOError("connection is read-only")
167
168 def _bad_message_length(self):
169 if self._writable:
170 self._readable = False
171 else:
172 self.close()
173 raise IOError("bad message length")
174
175 @property
176 def closed(self):
177 """True if the connection is closed"""
178 return self._handle is None
179
180 @property
181 def readable(self):
182 """True if the connection is readable"""
183 return self._readable
184
185 @property
186 def writable(self):
187 """True if the connection is writable"""
188 return self._writable
189
190 def fileno(self):
191 """File descriptor or handle of the connection"""
192 self._check_closed()
193 return self._handle
194
195 def close(self):
196 """Close the connection"""
197 if self._handle is not None:
198 try:
199 self._close()
200 finally:
201 self._handle = None
202
203 def send_bytes(self, buf, offset=0, size=None):
204 """Send the bytes data from a bytes-like object"""
205 self._check_closed()
206 self._check_writable()
207 m = memoryview(buf)
208 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
209 if m.itemsize > 1:
210 m = memoryview(bytes(m))
211 n = len(m)
212 if offset < 0:
213 raise ValueError("offset is negative")
214 if n < offset:
215 raise ValueError("buffer length < offset")
216 if size is None:
217 size = n - offset
218 elif size < 0:
219 raise ValueError("size is negative")
220 elif offset + size > n:
221 raise ValueError("buffer length < offset + size")
222 self._send_bytes(m[offset:offset + size])
223
224 def send(self, obj):
225 """Send a (picklable) object"""
226 self._check_closed()
227 self._check_writable()
228 buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
229 self._send_bytes(memoryview(buf))
230
231 def recv_bytes(self, maxlength=None):
232 """
233 Receive bytes data as a bytes object.
234 """
235 self._check_closed()
236 self._check_readable()
237 if maxlength is not None and maxlength < 0:
238 raise ValueError("negative maxlength")
239 buf = self._recv_bytes(maxlength)
240 if buf is None:
241 self._bad_message_length()
242 return buf.getvalue()
243
244 def recv_bytes_into(self, buf, offset=0):
245 """
246 Receive bytes data into a writeable buffer-like object.
247 Return the number of bytes read.
248 """
249 self._check_closed()
250 self._check_readable()
251 with memoryview(buf) as m:
252 # Get bytesize of arbitrary buffer
253 itemsize = m.itemsize
254 bytesize = itemsize * len(m)
255 if offset < 0:
256 raise ValueError("negative offset")
257 elif offset > bytesize:
258 raise ValueError("offset too large")
259 result = self._recv_bytes()
260 size = result.tell()
261 if bytesize < offset + size:
262 raise BufferTooShort(result.getvalue())
263 # Message can fit in dest
264 result.seek(0)
265 result.readinto(m[offset // itemsize :
266 (offset + size) // itemsize])
267 return size
268
Antoine Pitroudd696492011-06-08 17:21:55 +0200269 def recv(self, sentinels=None):
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200270 """Receive a (picklable) object"""
271 self._check_closed()
272 self._check_readable()
Antoine Pitroudd696492011-06-08 17:21:55 +0200273 buf = self._recv_bytes(sentinels=sentinels)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200274 return pickle.loads(buf.getbuffer())
275
276 def poll(self, timeout=0.0):
277 """Whether there is any input available to be read"""
278 self._check_closed()
279 self._check_readable()
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200280 return self._poll(timeout)
281
282
283if win32:
284
285 class PipeConnection(_ConnectionBase):
286 """
287 Connection class based on a Windows named pipe.
Antoine Pitroudd696492011-06-08 17:21:55 +0200288 Overlapped I/O is used, so the handles must have been created
289 with FILE_FLAG_OVERLAPPED.
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200290 """
Antoine Pitroudd696492011-06-08 17:21:55 +0200291 _buffered = b''
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200292
293 def _close(self):
294 win32.CloseHandle(self._handle)
295
296 def _send_bytes(self, buf):
Antoine Pitroudd696492011-06-08 17:21:55 +0200297 overlapped = win32.WriteFile(self._handle, buf, overlapped=True)
298 nwritten, complete = overlapped.GetOverlappedResult(True)
299 assert complete
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200300 assert nwritten == len(buf)
301
Antoine Pitroudd696492011-06-08 17:21:55 +0200302 def _recv_bytes(self, maxsize=None, sentinels=()):
303 if sentinels:
304 self._poll(-1.0, sentinels)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200305 buf = io.BytesIO()
Antoine Pitroudd696492011-06-08 17:21:55 +0200306 firstchunk = self._buffered
307 if firstchunk:
308 lenfirstchunk = len(firstchunk)
309 buf.write(firstchunk)
310 self._buffered = b''
311 else:
312 # A reasonable size for the first chunk transfer
313 bufsize = 128
314 if maxsize is not None and maxsize < bufsize:
315 bufsize = maxsize
316 try:
317 overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True)
318 lenfirstchunk, complete = overlapped.GetOverlappedResult(True)
319 firstchunk = overlapped.getbuffer()
320 assert lenfirstchunk == len(firstchunk)
321 except IOError as e:
322 if e.errno == win32.ERROR_BROKEN_PIPE:
323 raise EOFError
324 raise
325 buf.write(firstchunk)
326 if complete:
327 return buf
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200328 navail, nleft = win32.PeekNamedPipe(self._handle)
329 if maxsize is not None and lenfirstchunk + nleft > maxsize:
330 return None
Antoine Pitroudd696492011-06-08 17:21:55 +0200331 if nleft > 0:
332 overlapped = win32.ReadFile(self._handle, nleft, overlapped=True)
333 res, complete = overlapped.GetOverlappedResult(True)
334 assert res == nleft
335 assert complete
336 buf.write(overlapped.getbuffer())
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200337 return buf
338
Antoine Pitroudd696492011-06-08 17:21:55 +0200339 def _poll(self, timeout, sentinels=()):
340 # Fast non-blocking path
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200341 navail, nleft = win32.PeekNamedPipe(self._handle)
342 if navail > 0:
343 return True
344 elif timeout == 0.0:
345 return False
Antoine Pitroudd696492011-06-08 17:21:55 +0200346 # Blocking: use overlapped I/O
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200347 if timeout < 0.0:
Antoine Pitroudd696492011-06-08 17:21:55 +0200348 timeout = INFINITE
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200349 else:
Antoine Pitroudd696492011-06-08 17:21:55 +0200350 timeout = int(timeout * 1000 + 0.5)
351 overlapped = win32.ReadFile(self._handle, 1, overlapped=True)
352 try:
353 handles = [overlapped.event]
354 handles += sentinels
355 res = win32.WaitForMultipleObjects(handles, False, timeout)
356 finally:
357 # Always cancel overlapped I/O in the same thread
358 # (because CancelIoEx() appears only in Vista)
359 overlapped.cancel()
360 if res == WAIT_TIMEOUT:
361 return False
362 idx = res - WAIT_OBJECT_0
363 if idx == 0:
364 # I/O was successful, store received data
365 overlapped.GetOverlappedResult(True)
366 self._buffered += overlapped.getbuffer()
367 return True
368 assert 0 < idx < len(handles)
369 raise SentinelReady([handles[idx]])
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200370
371
372class Connection(_ConnectionBase):
373 """
374 Connection class based on an arbitrary file descriptor (Unix only), or
375 a socket handle (Windows).
376 """
377
378 if win32:
379 def _close(self):
380 win32.closesocket(self._handle)
381 _write = win32.send
382 _read = win32.recv
383 else:
384 def _close(self):
385 os.close(self._handle)
386 _write = os.write
387 _read = os.read
388
389 def _send(self, buf, write=_write):
390 remaining = len(buf)
391 while True:
392 n = write(self._handle, buf)
393 remaining -= n
394 if remaining == 0:
395 break
396 buf = buf[n:]
397
Antoine Pitroudd696492011-06-08 17:21:55 +0200398 def _recv(self, size, sentinels=(), read=_read):
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200399 buf = io.BytesIO()
Antoine Pitroudd696492011-06-08 17:21:55 +0200400 handle = self._handle
401 if sentinels:
402 handles = [handle] + sentinels
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200403 remaining = size
404 while remaining > 0:
Antoine Pitroudd696492011-06-08 17:21:55 +0200405 if sentinels:
406 r = _select(handles, [], [])[0]
407 if handle not in r:
408 raise SentinelReady(r)
409 chunk = read(handle, remaining)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200410 n = len(chunk)
411 if n == 0:
412 if remaining == size:
413 raise EOFError
414 else:
415 raise IOError("got end of file during message")
416 buf.write(chunk)
417 remaining -= n
418 return buf
419
420 def _send_bytes(self, buf):
421 # For wire compatibility with 3.2 and lower
422 n = len(buf)
423 self._send(struct.pack("=i", len(buf)))
424 # The condition is necessary to avoid "broken pipe" errors
425 # when sending a 0-length buffer if the other end closed the pipe.
426 if n > 0:
427 self._send(buf)
428
Antoine Pitroudd696492011-06-08 17:21:55 +0200429 def _recv_bytes(self, maxsize=None, sentinels=()):
430 buf = self._recv(4, sentinels)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200431 size, = struct.unpack("=i", buf.getvalue())
432 if maxsize is not None and size > maxsize:
433 return None
Antoine Pitroudd696492011-06-08 17:21:55 +0200434 return self._recv(size, sentinels)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200435
436 def _poll(self, timeout):
Antoine Pitroudd696492011-06-08 17:21:55 +0200437 if timeout < 0.0:
438 timeout = None
439 r = _select([self._handle], [], [], timeout)[0]
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200440 return bool(r)
441
442
443#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000444# Public functions
445#
446
447class Listener(object):
448 '''
449 Returns a listener object.
450
451 This is a wrapper for a bound socket which is 'listening' for
452 connections, or for a Windows named pipe.
453 '''
454 def __init__(self, address=None, family=None, backlog=1, authkey=None):
455 family = family or (address and address_type(address)) \
456 or default_family
457 address = address or arbitrary_address(family)
458
459 if family == 'AF_PIPE':
460 self._listener = PipeListener(address, backlog)
461 else:
462 self._listener = SocketListener(address, family, backlog)
463
464 if authkey is not None and not isinstance(authkey, bytes):
465 raise TypeError('authkey should be a byte string')
466
467 self._authkey = authkey
468
469 def accept(self):
470 '''
471 Accept a connection on the bound socket or named pipe of `self`.
472
473 Returns a `Connection` object.
474 '''
475 c = self._listener.accept()
476 if self._authkey:
477 deliver_challenge(c, self._authkey)
478 answer_challenge(c, self._authkey)
479 return c
480
481 def close(self):
482 '''
483 Close the bound socket or named pipe of `self`.
484 '''
485 return self._listener.close()
486
487 address = property(lambda self: self._listener._address)
488 last_accepted = property(lambda self: self._listener._last_accepted)
489
490
491def Client(address, family=None, authkey=None):
492 '''
493 Returns a connection to the address of a `Listener`
494 '''
495 family = family or address_type(address)
496 if family == 'AF_PIPE':
497 c = PipeClient(address)
498 else:
499 c = SocketClient(address)
500
501 if authkey is not None and not isinstance(authkey, bytes):
502 raise TypeError('authkey should be a byte string')
503
504 if authkey is not None:
505 answer_challenge(c, authkey)
506 deliver_challenge(c, authkey)
507
508 return c
509
510
511if sys.platform != 'win32':
512
513 def Pipe(duplex=True):
514 '''
515 Returns pair of connection objects at either end of a pipe
516 '''
517 if duplex:
518 s1, s2 = socket.socketpair()
Antoine Pitrou5aa878c2011-05-09 21:00:28 +0200519 c1 = Connection(s1.detach())
520 c2 = Connection(s2.detach())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000521 else:
522 fd1, fd2 = os.pipe()
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200523 c1 = Connection(fd1, writable=False)
524 c2 = Connection(fd2, readable=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000525
526 return c1, c2
527
528else:
529
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 def Pipe(duplex=True):
531 '''
532 Returns pair of connection objects at either end of a pipe
533 '''
534 address = arbitrary_address('AF_PIPE')
535 if duplex:
536 openmode = win32.PIPE_ACCESS_DUPLEX
537 access = win32.GENERIC_READ | win32.GENERIC_WRITE
538 obsize, ibsize = BUFSIZE, BUFSIZE
539 else:
540 openmode = win32.PIPE_ACCESS_INBOUND
541 access = win32.GENERIC_WRITE
542 obsize, ibsize = 0, BUFSIZE
543
544 h1 = win32.CreateNamedPipe(
Antoine Pitroudd696492011-06-08 17:21:55 +0200545 address, openmode | win32.FILE_FLAG_OVERLAPPED,
Benjamin Petersone711caf2008-06-11 16:44:04 +0000546 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
547 win32.PIPE_WAIT,
548 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
549 )
550 h2 = win32.CreateFile(
Antoine Pitroudd696492011-06-08 17:21:55 +0200551 address, access, 0, win32.NULL, win32.OPEN_EXISTING,
552 win32.FILE_FLAG_OVERLAPPED, win32.NULL
Benjamin Petersone711caf2008-06-11 16:44:04 +0000553 )
554 win32.SetNamedPipeHandleState(
555 h2, win32.PIPE_READMODE_MESSAGE, None, None
556 )
557
Antoine Pitroudd696492011-06-08 17:21:55 +0200558 overlapped = win32.ConnectNamedPipe(h1, overlapped=True)
559 overlapped.GetOverlappedResult(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000560
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200561 c1 = PipeConnection(h1, writable=duplex)
562 c2 = PipeConnection(h2, readable=duplex)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000563
564 return c1, c2
565
566#
567# Definitions for connections based on sockets
568#
569
570class SocketListener(object):
571 '''
Georg Brandl734e2682008-08-12 08:18:18 +0000572 Representation of a socket which is bound to an address and listening
Benjamin Petersone711caf2008-06-11 16:44:04 +0000573 '''
574 def __init__(self, address, family, backlog=1):
575 self._socket = socket.socket(getattr(socket, family))
Jesse Nollerc5d28a02009-03-30 16:37:36 +0000576 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000577 self._socket.bind(address)
578 self._socket.listen(backlog)
Georg Brandl6aa2d1f2008-08-12 08:35:52 +0000579 self._address = self._socket.getsockname()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000580 self._family = family
581 self._last_accepted = None
582
Benjamin Petersone711caf2008-06-11 16:44:04 +0000583 if family == 'AF_UNIX':
584 self._unlink = Finalize(
Georg Brandl2ee470f2008-07-16 12:55:28 +0000585 self, os.unlink, args=(address,), exitpriority=0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000586 )
587 else:
588 self._unlink = None
589
590 def accept(self):
591 s, self._last_accepted = self._socket.accept()
592 fd = duplicate(s.fileno())
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200593 conn = Connection(fd)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000594 s.close()
595 return conn
596
597 def close(self):
598 self._socket.close()
599 if self._unlink is not None:
600 self._unlink()
601
602
603def SocketClient(address):
604 '''
605 Return a connection object connected to the socket given by `address`
606 '''
607 family = address_type(address)
Victor Stinner2b695062011-01-03 15:47:59 +0000608 with socket.socket( getattr(socket, family) ) as s:
609 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000610
Victor Stinner2b695062011-01-03 15:47:59 +0000611 while 1:
612 try:
613 s.connect(address)
614 except socket.error as e:
615 if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
616 debug('failed to connect to address %s', address)
617 raise
618 time.sleep(0.01)
619 else:
620 break
Benjamin Petersone711caf2008-06-11 16:44:04 +0000621 else:
Victor Stinner2b695062011-01-03 15:47:59 +0000622 raise
Benjamin Petersone711caf2008-06-11 16:44:04 +0000623
Victor Stinner2b695062011-01-03 15:47:59 +0000624 fd = duplicate(s.fileno())
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200625 conn = Connection(fd)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000626 return conn
627
628#
629# Definitions for connections based on named pipes
630#
631
632if sys.platform == 'win32':
633
634 class PipeListener(object):
635 '''
636 Representation of a named pipe
637 '''
638 def __init__(self, address, backlog=None):
639 self._address = address
640 handle = win32.CreateNamedPipe(
641 address, win32.PIPE_ACCESS_DUPLEX,
642 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
643 win32.PIPE_WAIT,
644 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
645 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
646 )
647 self._handle_queue = [handle]
648 self._last_accepted = None
649
650 sub_debug('listener created with address=%r', self._address)
651
652 self.close = Finalize(
653 self, PipeListener._finalize_pipe_listener,
654 args=(self._handle_queue, self._address), exitpriority=0
655 )
656
657 def accept(self):
658 newhandle = win32.CreateNamedPipe(
659 self._address, win32.PIPE_ACCESS_DUPLEX,
660 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
661 win32.PIPE_WAIT,
662 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
663 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
664 )
665 self._handle_queue.append(newhandle)
666 handle = self._handle_queue.pop(0)
667 try:
668 win32.ConnectNamedPipe(handle, win32.NULL)
669 except WindowsError as e:
670 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
671 raise
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200672 return PipeConnection(handle)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000673
674 @staticmethod
675 def _finalize_pipe_listener(queue, address):
676 sub_debug('closing listener with address=%r', address)
677 for handle in queue:
678 close(handle)
679
680 def PipeClient(address):
681 '''
682 Return a connection object connected to the pipe given by `address`
683 '''
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000684 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000685 while 1:
686 try:
687 win32.WaitNamedPipe(address, 1000)
688 h = win32.CreateFile(
689 address, win32.GENERIC_READ | win32.GENERIC_WRITE,
690 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
691 )
692 except WindowsError as e:
693 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000694 win32.ERROR_PIPE_BUSY) or _check_timeout(t):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000695 raise
696 else:
697 break
698 else:
699 raise
700
701 win32.SetNamedPipeHandleState(
702 h, win32.PIPE_READMODE_MESSAGE, None, None
703 )
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200704 return PipeConnection(h)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000705
706#
707# Authentication stuff
708#
709
710MESSAGE_LENGTH = 20
711
Benjamin Peterson1fcfe212008-06-25 12:54:22 +0000712CHALLENGE = b'#CHALLENGE#'
713WELCOME = b'#WELCOME#'
714FAILURE = b'#FAILURE#'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000715
716def deliver_challenge(connection, authkey):
717 import hmac
718 assert isinstance(authkey, bytes)
719 message = os.urandom(MESSAGE_LENGTH)
720 connection.send_bytes(CHALLENGE + message)
721 digest = hmac.new(authkey, message).digest()
722 response = connection.recv_bytes(256) # reject large message
723 if response == digest:
724 connection.send_bytes(WELCOME)
725 else:
726 connection.send_bytes(FAILURE)
727 raise AuthenticationError('digest received was wrong')
728
729def answer_challenge(connection, authkey):
730 import hmac
731 assert isinstance(authkey, bytes)
732 message = connection.recv_bytes(256) # reject large message
733 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
734 message = message[len(CHALLENGE):]
735 digest = hmac.new(authkey, message).digest()
736 connection.send_bytes(digest)
737 response = connection.recv_bytes(256) # reject large message
738 if response != WELCOME:
739 raise AuthenticationError('digest sent was rejected')
740
741#
742# Support for using xmlrpclib for serialization
743#
744
745class ConnectionWrapper(object):
746 def __init__(self, conn, dumps, loads):
747 self._conn = conn
748 self._dumps = dumps
749 self._loads = loads
750 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
751 obj = getattr(conn, attr)
752 setattr(self, attr, obj)
753 def send(self, obj):
754 s = self._dumps(obj)
755 self._conn.send_bytes(s)
756 def recv(self):
757 s = self._conn.recv_bytes()
758 return self._loads(s)
759
760def _xml_dumps(obj):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000761 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000762
763def _xml_loads(s):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000764 (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000765 return obj
766
767class XmlListener(Listener):
768 def accept(self):
769 global xmlrpclib
770 import xmlrpc.client as xmlrpclib
771 obj = Listener.accept(self)
772 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
773
774def XmlClient(*args, **kwds):
775 global xmlrpclib
776 import xmlrpc.client as xmlrpclib
777 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200778
779
780# Late import because of circular import
781from multiprocessing.forking import duplicate, close