blob: 8807618ed367416f2f0b64d2fcebf2bece56c346 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A higher level module for using sockets (or Windows named pipes)
3#
4# multiprocessing/connection.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35__all__ = [ 'Client', 'Listener', 'Pipe' ]
36
Antoine Pitrou87cf2202011-05-09 17:04:27 +020037import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import os
39import sys
Antoine Pitrou87cf2202011-05-09 17:04:27 +020040import pickle
41import select
Benjamin Petersone711caf2008-06-11 16:44:04 +000042import socket
Antoine Pitrou87cf2202011-05-09 17:04:27 +020043import struct
Georg Brandl6aa2d1f2008-08-12 08:35:52 +000044import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000045import time
46import tempfile
47import itertools
48
49import _multiprocessing
Antoine Pitrou87cf2202011-05-09 17:04:27 +020050from multiprocessing import current_process, AuthenticationError, BufferTooShort
Antoine Pitroudd696492011-06-08 17:21:55 +020051from multiprocessing.util import (
52 get_temp_dir, Finalize, sub_debug, debug, _eintr_retry)
Antoine Pitrou87cf2202011-05-09 17:04:27 +020053try:
54 from _multiprocessing import win32
Antoine Pitroudd696492011-06-08 17:21:55 +020055 from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
Antoine Pitrou87cf2202011-05-09 17:04:27 +020056except ImportError:
57 if sys.platform == 'win32':
58 raise
59 win32 = None
Benjamin Petersone711caf2008-06-11 16:44:04 +000060
Antoine Pitroudd696492011-06-08 17:21:55 +020061_select = _eintr_retry(select.select)
62
Benjamin Petersone711caf2008-06-11 16:44:04 +000063#
64#
65#
66
67BUFSIZE = 8192
Antoine Pitrou45d61a32009-11-13 22:35:18 +000068# A very generous timeout when it comes to local connections...
69CONNECTION_TIMEOUT = 20.
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71_mmap_counter = itertools.count()
72
73default_family = 'AF_INET'
74families = ['AF_INET']
75
76if hasattr(socket, 'AF_UNIX'):
77 default_family = 'AF_UNIX'
78 families += ['AF_UNIX']
79
80if sys.platform == 'win32':
81 default_family = 'AF_PIPE'
82 families += ['AF_PIPE']
83
Antoine Pitrou45d61a32009-11-13 22:35:18 +000084
85def _init_timeout(timeout=CONNECTION_TIMEOUT):
86 return time.time() + timeout
87
88def _check_timeout(t):
89 return time.time() > t
90
Benjamin Petersone711caf2008-06-11 16:44:04 +000091#
92#
93#
94
95def arbitrary_address(family):
96 '''
97 Return an arbitrary free address for the given family
98 '''
99 if family == 'AF_INET':
100 return ('localhost', 0)
101 elif family == 'AF_UNIX':
102 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
103 elif family == 'AF_PIPE':
104 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
105 (os.getpid(), next(_mmap_counter)))
106 else:
107 raise ValueError('unrecognized family')
108
109
110def address_type(address):
111 '''
112 Return the types of the address
113
114 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
115 '''
116 if type(address) == tuple:
117 return 'AF_INET'
118 elif type(address) is str and address.startswith('\\\\'):
119 return 'AF_PIPE'
120 elif type(address) is str:
121 return 'AF_UNIX'
122 else:
123 raise ValueError('address type of %r unrecognized' % address)
124
Antoine Pitroudd696492011-06-08 17:21:55 +0200125
126class SentinelReady(Exception):
127 """
128 Raised when a sentinel is ready when polling.
129 """
130 def __init__(self, *args):
131 Exception.__init__(self, *args)
132 self.sentinels = args[0]
133
Benjamin Petersone711caf2008-06-11 16:44:04 +0000134#
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200135# Connection classes
136#
137
138class _ConnectionBase:
139 _handle = None
140
141 def __init__(self, handle, readable=True, writable=True):
142 handle = handle.__index__()
143 if handle < 0:
144 raise ValueError("invalid handle")
145 if not readable and not writable:
146 raise ValueError(
147 "at least one of `readable` and `writable` must be True")
148 self._handle = handle
149 self._readable = readable
150 self._writable = writable
151
Antoine Pitrou60001202011-07-09 01:03:46 +0200152 # XXX should we use util.Finalize instead of a __del__?
153
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200154 def __del__(self):
155 if self._handle is not None:
156 self._close()
157
158 def _check_closed(self):
159 if self._handle is None:
160 raise IOError("handle is closed")
161
162 def _check_readable(self):
163 if not self._readable:
164 raise IOError("connection is write-only")
165
166 def _check_writable(self):
167 if not self._writable:
168 raise IOError("connection is read-only")
169
170 def _bad_message_length(self):
171 if self._writable:
172 self._readable = False
173 else:
174 self.close()
175 raise IOError("bad message length")
176
177 @property
178 def closed(self):
179 """True if the connection is closed"""
180 return self._handle is None
181
182 @property
183 def readable(self):
184 """True if the connection is readable"""
185 return self._readable
186
187 @property
188 def writable(self):
189 """True if the connection is writable"""
190 return self._writable
191
192 def fileno(self):
193 """File descriptor or handle of the connection"""
194 self._check_closed()
195 return self._handle
196
197 def close(self):
198 """Close the connection"""
199 if self._handle is not None:
200 try:
201 self._close()
202 finally:
203 self._handle = None
204
205 def send_bytes(self, buf, offset=0, size=None):
206 """Send the bytes data from a bytes-like object"""
207 self._check_closed()
208 self._check_writable()
209 m = memoryview(buf)
210 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
211 if m.itemsize > 1:
212 m = memoryview(bytes(m))
213 n = len(m)
214 if offset < 0:
215 raise ValueError("offset is negative")
216 if n < offset:
217 raise ValueError("buffer length < offset")
218 if size is None:
219 size = n - offset
220 elif size < 0:
221 raise ValueError("size is negative")
222 elif offset + size > n:
223 raise ValueError("buffer length < offset + size")
224 self._send_bytes(m[offset:offset + size])
225
226 def send(self, obj):
227 """Send a (picklable) object"""
228 self._check_closed()
229 self._check_writable()
230 buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
231 self._send_bytes(memoryview(buf))
232
233 def recv_bytes(self, maxlength=None):
234 """
235 Receive bytes data as a bytes object.
236 """
237 self._check_closed()
238 self._check_readable()
239 if maxlength is not None and maxlength < 0:
240 raise ValueError("negative maxlength")
241 buf = self._recv_bytes(maxlength)
242 if buf is None:
243 self._bad_message_length()
244 return buf.getvalue()
245
246 def recv_bytes_into(self, buf, offset=0):
247 """
248 Receive bytes data into a writeable buffer-like object.
249 Return the number of bytes read.
250 """
251 self._check_closed()
252 self._check_readable()
253 with memoryview(buf) as m:
254 # Get bytesize of arbitrary buffer
255 itemsize = m.itemsize
256 bytesize = itemsize * len(m)
257 if offset < 0:
258 raise ValueError("negative offset")
259 elif offset > bytesize:
260 raise ValueError("offset too large")
261 result = self._recv_bytes()
262 size = result.tell()
263 if bytesize < offset + size:
264 raise BufferTooShort(result.getvalue())
265 # Message can fit in dest
266 result.seek(0)
267 result.readinto(m[offset // itemsize :
268 (offset + size) // itemsize])
269 return size
270
Antoine Pitroudd696492011-06-08 17:21:55 +0200271 def recv(self, sentinels=None):
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200272 """Receive a (picklable) object"""
273 self._check_closed()
274 self._check_readable()
Antoine Pitroudd696492011-06-08 17:21:55 +0200275 buf = self._recv_bytes(sentinels=sentinels)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200276 return pickle.loads(buf.getbuffer())
277
278 def poll(self, timeout=0.0):
279 """Whether there is any input available to be read"""
280 self._check_closed()
281 self._check_readable()
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200282 return self._poll(timeout)
283
284
285if win32:
286
287 class PipeConnection(_ConnectionBase):
288 """
289 Connection class based on a Windows named pipe.
Antoine Pitroudd696492011-06-08 17:21:55 +0200290 Overlapped I/O is used, so the handles must have been created
291 with FILE_FLAG_OVERLAPPED.
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200292 """
Antoine Pitroudd696492011-06-08 17:21:55 +0200293 _buffered = b''
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200294
Antoine Pitrou71a28a92011-07-09 01:03:00 +0200295 def _close(self, _CloseHandle=win32.CloseHandle):
296 _CloseHandle(self._handle)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200297
298 def _send_bytes(self, buf):
Antoine Pitroudd696492011-06-08 17:21:55 +0200299 overlapped = win32.WriteFile(self._handle, buf, overlapped=True)
300 nwritten, complete = overlapped.GetOverlappedResult(True)
301 assert complete
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200302 assert nwritten == len(buf)
303
Antoine Pitroudd696492011-06-08 17:21:55 +0200304 def _recv_bytes(self, maxsize=None, sentinels=()):
305 if sentinels:
306 self._poll(-1.0, sentinels)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200307 buf = io.BytesIO()
Antoine Pitroudd696492011-06-08 17:21:55 +0200308 firstchunk = self._buffered
309 if firstchunk:
310 lenfirstchunk = len(firstchunk)
311 buf.write(firstchunk)
312 self._buffered = b''
313 else:
314 # A reasonable size for the first chunk transfer
315 bufsize = 128
316 if maxsize is not None and maxsize < bufsize:
317 bufsize = maxsize
318 try:
319 overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True)
320 lenfirstchunk, complete = overlapped.GetOverlappedResult(True)
321 firstchunk = overlapped.getbuffer()
322 assert lenfirstchunk == len(firstchunk)
323 except IOError as e:
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200324 if e.winerror == win32.ERROR_BROKEN_PIPE:
Antoine Pitroudd696492011-06-08 17:21:55 +0200325 raise EOFError
326 raise
327 buf.write(firstchunk)
328 if complete:
329 return buf
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200330 navail, nleft = win32.PeekNamedPipe(self._handle)
331 if maxsize is not None and lenfirstchunk + nleft > maxsize:
332 return None
Antoine Pitroudd696492011-06-08 17:21:55 +0200333 if nleft > 0:
334 overlapped = win32.ReadFile(self._handle, nleft, overlapped=True)
335 res, complete = overlapped.GetOverlappedResult(True)
336 assert res == nleft
337 assert complete
338 buf.write(overlapped.getbuffer())
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200339 return buf
340
Antoine Pitroudd696492011-06-08 17:21:55 +0200341 def _poll(self, timeout, sentinels=()):
342 # Fast non-blocking path
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200343 navail, nleft = win32.PeekNamedPipe(self._handle)
344 if navail > 0:
345 return True
346 elif timeout == 0.0:
347 return False
Antoine Pitroudd696492011-06-08 17:21:55 +0200348 # Blocking: use overlapped I/O
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200349 if timeout < 0.0:
Antoine Pitroudd696492011-06-08 17:21:55 +0200350 timeout = INFINITE
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200351 else:
Antoine Pitroudd696492011-06-08 17:21:55 +0200352 timeout = int(timeout * 1000 + 0.5)
353 overlapped = win32.ReadFile(self._handle, 1, overlapped=True)
354 try:
355 handles = [overlapped.event]
356 handles += sentinels
357 res = win32.WaitForMultipleObjects(handles, False, timeout)
358 finally:
359 # Always cancel overlapped I/O in the same thread
360 # (because CancelIoEx() appears only in Vista)
361 overlapped.cancel()
362 if res == WAIT_TIMEOUT:
363 return False
364 idx = res - WAIT_OBJECT_0
365 if idx == 0:
366 # I/O was successful, store received data
367 overlapped.GetOverlappedResult(True)
368 self._buffered += overlapped.getbuffer()
369 return True
370 assert 0 < idx < len(handles)
371 raise SentinelReady([handles[idx]])
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200372
373
374class Connection(_ConnectionBase):
375 """
376 Connection class based on an arbitrary file descriptor (Unix only), or
377 a socket handle (Windows).
378 """
379
380 if win32:
Antoine Pitrou71a28a92011-07-09 01:03:00 +0200381 def _close(self, _close=win32.closesocket):
382 _close(self._handle)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200383 _write = win32.send
384 _read = win32.recv
385 else:
Antoine Pitrou71a28a92011-07-09 01:03:00 +0200386 def _close(self, _close=os.close):
387 _close(self._handle)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200388 _write = os.write
389 _read = os.read
390
391 def _send(self, buf, write=_write):
392 remaining = len(buf)
393 while True:
394 n = write(self._handle, buf)
395 remaining -= n
396 if remaining == 0:
397 break
398 buf = buf[n:]
399
Antoine Pitroudd696492011-06-08 17:21:55 +0200400 def _recv(self, size, sentinels=(), read=_read):
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200401 buf = io.BytesIO()
Antoine Pitroudd696492011-06-08 17:21:55 +0200402 handle = self._handle
403 if sentinels:
404 handles = [handle] + sentinels
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200405 remaining = size
406 while remaining > 0:
Antoine Pitroudd696492011-06-08 17:21:55 +0200407 if sentinels:
408 r = _select(handles, [], [])[0]
409 if handle not in r:
410 raise SentinelReady(r)
411 chunk = read(handle, remaining)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200412 n = len(chunk)
413 if n == 0:
414 if remaining == size:
415 raise EOFError
416 else:
417 raise IOError("got end of file during message")
418 buf.write(chunk)
419 remaining -= n
420 return buf
421
422 def _send_bytes(self, buf):
423 # For wire compatibility with 3.2 and lower
424 n = len(buf)
Charles-François Natali225aa4f2011-09-20 19:27:39 +0200425 self._send(struct.pack("!i", n))
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200426 # The condition is necessary to avoid "broken pipe" errors
427 # when sending a 0-length buffer if the other end closed the pipe.
428 if n > 0:
429 self._send(buf)
430
Antoine Pitroudd696492011-06-08 17:21:55 +0200431 def _recv_bytes(self, maxsize=None, sentinels=()):
432 buf = self._recv(4, sentinels)
Charles-François Natali225aa4f2011-09-20 19:27:39 +0200433 size, = struct.unpack("!i", buf.getvalue())
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200434 if maxsize is not None and size > maxsize:
435 return None
Antoine Pitroudd696492011-06-08 17:21:55 +0200436 return self._recv(size, sentinels)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200437
438 def _poll(self, timeout):
Antoine Pitroudd696492011-06-08 17:21:55 +0200439 if timeout < 0.0:
440 timeout = None
441 r = _select([self._handle], [], [], timeout)[0]
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200442 return bool(r)
443
444
445#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000446# Public functions
447#
448
449class Listener(object):
450 '''
451 Returns a listener object.
452
453 This is a wrapper for a bound socket which is 'listening' for
454 connections, or for a Windows named pipe.
455 '''
456 def __init__(self, address=None, family=None, backlog=1, authkey=None):
457 family = family or (address and address_type(address)) \
458 or default_family
459 address = address or arbitrary_address(family)
460
461 if family == 'AF_PIPE':
462 self._listener = PipeListener(address, backlog)
463 else:
464 self._listener = SocketListener(address, family, backlog)
465
466 if authkey is not None and not isinstance(authkey, bytes):
467 raise TypeError('authkey should be a byte string')
468
469 self._authkey = authkey
470
471 def accept(self):
472 '''
473 Accept a connection on the bound socket or named pipe of `self`.
474
475 Returns a `Connection` object.
476 '''
477 c = self._listener.accept()
478 if self._authkey:
479 deliver_challenge(c, self._authkey)
480 answer_challenge(c, self._authkey)
481 return c
482
483 def close(self):
484 '''
485 Close the bound socket or named pipe of `self`.
486 '''
487 return self._listener.close()
488
489 address = property(lambda self: self._listener._address)
490 last_accepted = property(lambda self: self._listener._last_accepted)
491
492
493def Client(address, family=None, authkey=None):
494 '''
495 Returns a connection to the address of a `Listener`
496 '''
497 family = family or address_type(address)
498 if family == 'AF_PIPE':
499 c = PipeClient(address)
500 else:
501 c = SocketClient(address)
502
503 if authkey is not None and not isinstance(authkey, bytes):
504 raise TypeError('authkey should be a byte string')
505
506 if authkey is not None:
507 answer_challenge(c, authkey)
508 deliver_challenge(c, authkey)
509
510 return c
511
512
513if sys.platform != 'win32':
514
515 def Pipe(duplex=True):
516 '''
517 Returns pair of connection objects at either end of a pipe
518 '''
519 if duplex:
520 s1, s2 = socket.socketpair()
Antoine Pitrou5aa878c2011-05-09 21:00:28 +0200521 c1 = Connection(s1.detach())
522 c2 = Connection(s2.detach())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000523 else:
524 fd1, fd2 = os.pipe()
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200525 c1 = Connection(fd1, writable=False)
526 c2 = Connection(fd2, readable=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000527
528 return c1, c2
529
530else:
531
Benjamin Petersone711caf2008-06-11 16:44:04 +0000532 def Pipe(duplex=True):
533 '''
534 Returns pair of connection objects at either end of a pipe
535 '''
536 address = arbitrary_address('AF_PIPE')
537 if duplex:
538 openmode = win32.PIPE_ACCESS_DUPLEX
539 access = win32.GENERIC_READ | win32.GENERIC_WRITE
540 obsize, ibsize = BUFSIZE, BUFSIZE
541 else:
542 openmode = win32.PIPE_ACCESS_INBOUND
543 access = win32.GENERIC_WRITE
544 obsize, ibsize = 0, BUFSIZE
545
546 h1 = win32.CreateNamedPipe(
Charles-François Natalied4a8fc2012-02-08 21:15:58 +0100547 address, openmode | win32.FILE_FLAG_OVERLAPPED |
548 win32.FILE_FLAG_FIRST_PIPE_INSTANCE,
Benjamin Petersone711caf2008-06-11 16:44:04 +0000549 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
550 win32.PIPE_WAIT,
551 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
552 )
553 h2 = win32.CreateFile(
Antoine Pitroudd696492011-06-08 17:21:55 +0200554 address, access, 0, win32.NULL, win32.OPEN_EXISTING,
555 win32.FILE_FLAG_OVERLAPPED, win32.NULL
Benjamin Petersone711caf2008-06-11 16:44:04 +0000556 )
557 win32.SetNamedPipeHandleState(
558 h2, win32.PIPE_READMODE_MESSAGE, None, None
559 )
560
Antoine Pitroudd696492011-06-08 17:21:55 +0200561 overlapped = win32.ConnectNamedPipe(h1, overlapped=True)
562 overlapped.GetOverlappedResult(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000563
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200564 c1 = PipeConnection(h1, writable=duplex)
565 c2 = PipeConnection(h2, readable=duplex)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000566
567 return c1, c2
568
569#
570# Definitions for connections based on sockets
571#
572
573class SocketListener(object):
574 '''
Georg Brandl734e2682008-08-12 08:18:18 +0000575 Representation of a socket which is bound to an address and listening
Benjamin Petersone711caf2008-06-11 16:44:04 +0000576 '''
577 def __init__(self, address, family, backlog=1):
578 self._socket = socket.socket(getattr(socket, family))
Charles-François Nataliedc67fe2012-02-04 15:12:08 +0100579 try:
Charles-François Natalied4a8fc2012-02-08 21:15:58 +0100580 # SO_REUSEADDR has different semantics on Windows (issue #2550).
581 if os.name == 'posix':
582 self._socket.setsockopt(socket.SOL_SOCKET,
583 socket.SO_REUSEADDR, 1)
Charles-François Nataliedc67fe2012-02-04 15:12:08 +0100584 self._socket.bind(address)
585 self._socket.listen(backlog)
586 self._address = self._socket.getsockname()
587 except OSError:
588 self._socket.close()
589 raise
Benjamin Petersone711caf2008-06-11 16:44:04 +0000590 self._family = family
591 self._last_accepted = None
592
Benjamin Petersone711caf2008-06-11 16:44:04 +0000593 if family == 'AF_UNIX':
594 self._unlink = Finalize(
Georg Brandl2ee470f2008-07-16 12:55:28 +0000595 self, os.unlink, args=(address,), exitpriority=0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000596 )
597 else:
598 self._unlink = None
599
600 def accept(self):
601 s, self._last_accepted = self._socket.accept()
602 fd = duplicate(s.fileno())
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200603 conn = Connection(fd)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000604 s.close()
605 return conn
606
607 def close(self):
608 self._socket.close()
609 if self._unlink is not None:
610 self._unlink()
611
612
613def SocketClient(address):
614 '''
615 Return a connection object connected to the socket given by `address`
616 '''
617 family = address_type(address)
Victor Stinner2b695062011-01-03 15:47:59 +0000618 with socket.socket( getattr(socket, family) ) as s:
Charles-François Natalie6eabd42011-11-19 09:59:43 +0100619 s.connect(address)
Victor Stinner2b695062011-01-03 15:47:59 +0000620 fd = duplicate(s.fileno())
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200621 conn = Connection(fd)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000622 return conn
623
624#
625# Definitions for connections based on named pipes
626#
627
628if sys.platform == 'win32':
629
630 class PipeListener(object):
631 '''
632 Representation of a named pipe
633 '''
634 def __init__(self, address, backlog=None):
635 self._address = address
636 handle = win32.CreateNamedPipe(
Charles-François Natalied4a8fc2012-02-08 21:15:58 +0100637 address, win32.PIPE_ACCESS_DUPLEX |
638 win32.FILE_FLAG_FIRST_PIPE_INSTANCE,
Benjamin Petersone711caf2008-06-11 16:44:04 +0000639 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
640 win32.PIPE_WAIT,
641 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
642 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
643 )
644 self._handle_queue = [handle]
645 self._last_accepted = None
646
647 sub_debug('listener created with address=%r', self._address)
648
649 self.close = Finalize(
650 self, PipeListener._finalize_pipe_listener,
651 args=(self._handle_queue, self._address), exitpriority=0
652 )
653
654 def accept(self):
655 newhandle = win32.CreateNamedPipe(
656 self._address, win32.PIPE_ACCESS_DUPLEX,
657 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
658 win32.PIPE_WAIT,
659 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
660 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
661 )
662 self._handle_queue.append(newhandle)
663 handle = self._handle_queue.pop(0)
664 try:
665 win32.ConnectNamedPipe(handle, win32.NULL)
666 except WindowsError as e:
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200667 if e.winerror != win32.ERROR_PIPE_CONNECTED:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000668 raise
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200669 return PipeConnection(handle)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000670
671 @staticmethod
672 def _finalize_pipe_listener(queue, address):
673 sub_debug('closing listener with address=%r', address)
674 for handle in queue:
675 close(handle)
676
677 def PipeClient(address):
678 '''
679 Return a connection object connected to the pipe given by `address`
680 '''
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000681 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000682 while 1:
683 try:
684 win32.WaitNamedPipe(address, 1000)
685 h = win32.CreateFile(
686 address, win32.GENERIC_READ | win32.GENERIC_WRITE,
687 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
688 )
689 except WindowsError as e:
Antoine Pitrou6b4883d2011-10-12 02:54:14 +0200690 if e.winerror not in (win32.ERROR_SEM_TIMEOUT,
691 win32.ERROR_PIPE_BUSY) or _check_timeout(t):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000692 raise
693 else:
694 break
695 else:
696 raise
697
698 win32.SetNamedPipeHandleState(
699 h, win32.PIPE_READMODE_MESSAGE, None, None
700 )
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200701 return PipeConnection(h)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000702
703#
704# Authentication stuff
705#
706
707MESSAGE_LENGTH = 20
708
Benjamin Peterson1fcfe212008-06-25 12:54:22 +0000709CHALLENGE = b'#CHALLENGE#'
710WELCOME = b'#WELCOME#'
711FAILURE = b'#FAILURE#'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000712
713def deliver_challenge(connection, authkey):
714 import hmac
715 assert isinstance(authkey, bytes)
716 message = os.urandom(MESSAGE_LENGTH)
717 connection.send_bytes(CHALLENGE + message)
718 digest = hmac.new(authkey, message).digest()
719 response = connection.recv_bytes(256) # reject large message
720 if response == digest:
721 connection.send_bytes(WELCOME)
722 else:
723 connection.send_bytes(FAILURE)
724 raise AuthenticationError('digest received was wrong')
725
726def answer_challenge(connection, authkey):
727 import hmac
728 assert isinstance(authkey, bytes)
729 message = connection.recv_bytes(256) # reject large message
730 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
731 message = message[len(CHALLENGE):]
732 digest = hmac.new(authkey, message).digest()
733 connection.send_bytes(digest)
734 response = connection.recv_bytes(256) # reject large message
735 if response != WELCOME:
736 raise AuthenticationError('digest sent was rejected')
737
738#
739# Support for using xmlrpclib for serialization
740#
741
742class ConnectionWrapper(object):
743 def __init__(self, conn, dumps, loads):
744 self._conn = conn
745 self._dumps = dumps
746 self._loads = loads
747 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
748 obj = getattr(conn, attr)
749 setattr(self, attr, obj)
750 def send(self, obj):
751 s = self._dumps(obj)
752 self._conn.send_bytes(s)
753 def recv(self):
754 s = self._conn.recv_bytes()
755 return self._loads(s)
756
757def _xml_dumps(obj):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000758 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000759
760def _xml_loads(s):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000761 (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000762 return obj
763
764class XmlListener(Listener):
765 def accept(self):
766 global xmlrpclib
767 import xmlrpc.client as xmlrpclib
768 obj = Listener.accept(self)
769 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
770
771def XmlClient(*args, **kwds):
772 global xmlrpclib
773 import xmlrpc.client as xmlrpclib
774 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200775
776
777# Late import because of circular import
778from multiprocessing.forking import duplicate, close