blob: 415e21066ae58b9b78627020d950c6e1fbe21334 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A higher level module for using sockets (or Windows named pipes)
3#
4# multiprocessing/connection.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35__all__ = [ 'Client', 'Listener', 'Pipe' ]
36
Antoine Pitrou87cf2202011-05-09 17:04:27 +020037import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import os
39import sys
Antoine Pitrou87cf2202011-05-09 17:04:27 +020040import pickle
41import select
Benjamin Petersone711caf2008-06-11 16:44:04 +000042import socket
Antoine Pitrou87cf2202011-05-09 17:04:27 +020043import struct
Georg Brandl6aa2d1f2008-08-12 08:35:52 +000044import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000045import time
46import tempfile
47import itertools
48
49import _multiprocessing
Antoine Pitrou87cf2202011-05-09 17:04:27 +020050from multiprocessing import current_process, AuthenticationError, BufferTooShort
Benjamin Petersone711caf2008-06-11 16:44:04 +000051from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
Antoine Pitrou87cf2202011-05-09 17:04:27 +020052try:
53 from _multiprocessing import win32
54except ImportError:
55 if sys.platform == 'win32':
56 raise
57 win32 = None
Benjamin Petersone711caf2008-06-11 16:44:04 +000058
59#
60#
61#
62
63BUFSIZE = 8192
Antoine Pitrou45d61a32009-11-13 22:35:18 +000064# A very generous timeout when it comes to local connections...
65CONNECTION_TIMEOUT = 20.
Benjamin Petersone711caf2008-06-11 16:44:04 +000066
67_mmap_counter = itertools.count()
68
69default_family = 'AF_INET'
70families = ['AF_INET']
71
72if hasattr(socket, 'AF_UNIX'):
73 default_family = 'AF_UNIX'
74 families += ['AF_UNIX']
75
76if sys.platform == 'win32':
77 default_family = 'AF_PIPE'
78 families += ['AF_PIPE']
79
Antoine Pitrou45d61a32009-11-13 22:35:18 +000080
81def _init_timeout(timeout=CONNECTION_TIMEOUT):
82 return time.time() + timeout
83
84def _check_timeout(t):
85 return time.time() > t
86
Benjamin Petersone711caf2008-06-11 16:44:04 +000087#
88#
89#
90
91def arbitrary_address(family):
92 '''
93 Return an arbitrary free address for the given family
94 '''
95 if family == 'AF_INET':
96 return ('localhost', 0)
97 elif family == 'AF_UNIX':
98 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
99 elif family == 'AF_PIPE':
100 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
101 (os.getpid(), next(_mmap_counter)))
102 else:
103 raise ValueError('unrecognized family')
104
105
106def address_type(address):
107 '''
108 Return the types of the address
109
110 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
111 '''
112 if type(address) == tuple:
113 return 'AF_INET'
114 elif type(address) is str and address.startswith('\\\\'):
115 return 'AF_PIPE'
116 elif type(address) is str:
117 return 'AF_UNIX'
118 else:
119 raise ValueError('address type of %r unrecognized' % address)
120
121#
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200122# Connection classes
123#
124
125class _ConnectionBase:
126 _handle = None
127
128 def __init__(self, handle, readable=True, writable=True):
129 handle = handle.__index__()
130 if handle < 0:
131 raise ValueError("invalid handle")
132 if not readable and not writable:
133 raise ValueError(
134 "at least one of `readable` and `writable` must be True")
135 self._handle = handle
136 self._readable = readable
137 self._writable = writable
138
139 def __del__(self):
140 if self._handle is not None:
141 self._close()
142
143 def _check_closed(self):
144 if self._handle is None:
145 raise IOError("handle is closed")
146
147 def _check_readable(self):
148 if not self._readable:
149 raise IOError("connection is write-only")
150
151 def _check_writable(self):
152 if not self._writable:
153 raise IOError("connection is read-only")
154
155 def _bad_message_length(self):
156 if self._writable:
157 self._readable = False
158 else:
159 self.close()
160 raise IOError("bad message length")
161
162 @property
163 def closed(self):
164 """True if the connection is closed"""
165 return self._handle is None
166
167 @property
168 def readable(self):
169 """True if the connection is readable"""
170 return self._readable
171
172 @property
173 def writable(self):
174 """True if the connection is writable"""
175 return self._writable
176
177 def fileno(self):
178 """File descriptor or handle of the connection"""
179 self._check_closed()
180 return self._handle
181
182 def close(self):
183 """Close the connection"""
184 if self._handle is not None:
185 try:
186 self._close()
187 finally:
188 self._handle = None
189
190 def send_bytes(self, buf, offset=0, size=None):
191 """Send the bytes data from a bytes-like object"""
192 self._check_closed()
193 self._check_writable()
194 m = memoryview(buf)
195 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
196 if m.itemsize > 1:
197 m = memoryview(bytes(m))
198 n = len(m)
199 if offset < 0:
200 raise ValueError("offset is negative")
201 if n < offset:
202 raise ValueError("buffer length < offset")
203 if size is None:
204 size = n - offset
205 elif size < 0:
206 raise ValueError("size is negative")
207 elif offset + size > n:
208 raise ValueError("buffer length < offset + size")
209 self._send_bytes(m[offset:offset + size])
210
211 def send(self, obj):
212 """Send a (picklable) object"""
213 self._check_closed()
214 self._check_writable()
215 buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
216 self._send_bytes(memoryview(buf))
217
218 def recv_bytes(self, maxlength=None):
219 """
220 Receive bytes data as a bytes object.
221 """
222 self._check_closed()
223 self._check_readable()
224 if maxlength is not None and maxlength < 0:
225 raise ValueError("negative maxlength")
226 buf = self._recv_bytes(maxlength)
227 if buf is None:
228 self._bad_message_length()
229 return buf.getvalue()
230
231 def recv_bytes_into(self, buf, offset=0):
232 """
233 Receive bytes data into a writeable buffer-like object.
234 Return the number of bytes read.
235 """
236 self._check_closed()
237 self._check_readable()
238 with memoryview(buf) as m:
239 # Get bytesize of arbitrary buffer
240 itemsize = m.itemsize
241 bytesize = itemsize * len(m)
242 if offset < 0:
243 raise ValueError("negative offset")
244 elif offset > bytesize:
245 raise ValueError("offset too large")
246 result = self._recv_bytes()
247 size = result.tell()
248 if bytesize < offset + size:
249 raise BufferTooShort(result.getvalue())
250 # Message can fit in dest
251 result.seek(0)
252 result.readinto(m[offset // itemsize :
253 (offset + size) // itemsize])
254 return size
255
256 def recv(self):
257 """Receive a (picklable) object"""
258 self._check_closed()
259 self._check_readable()
260 buf = self._recv_bytes()
261 return pickle.loads(buf.getbuffer())
262
263 def poll(self, timeout=0.0):
264 """Whether there is any input available to be read"""
265 self._check_closed()
266 self._check_readable()
267 if timeout < 0.0:
268 timeout = None
269 return self._poll(timeout)
270
271
272if win32:
273
274 class PipeConnection(_ConnectionBase):
275 """
276 Connection class based on a Windows named pipe.
277 """
278
279 def _close(self):
280 win32.CloseHandle(self._handle)
281
282 def _send_bytes(self, buf):
283 nwritten = win32.WriteFile(self._handle, buf)
284 assert nwritten == len(buf)
285
286 def _recv_bytes(self, maxsize=None):
287 buf = io.BytesIO()
288 bufsize = 512
289 if maxsize is not None:
290 bufsize = min(bufsize, maxsize)
291 try:
292 firstchunk, complete = win32.ReadFile(self._handle, bufsize)
293 except IOError as e:
294 if e.errno == win32.ERROR_BROKEN_PIPE:
295 raise EOFError
296 raise
297 lenfirstchunk = len(firstchunk)
298 buf.write(firstchunk)
299 if complete:
300 return buf
301 navail, nleft = win32.PeekNamedPipe(self._handle)
302 if maxsize is not None and lenfirstchunk + nleft > maxsize:
303 return None
304 lastchunk, complete = win32.ReadFile(self._handle, nleft)
305 assert complete
306 buf.write(lastchunk)
307 return buf
308
309 def _poll(self, timeout):
310 navail, nleft = win32.PeekNamedPipe(self._handle)
311 if navail > 0:
312 return True
313 elif timeout == 0.0:
314 return False
315 # Setup a polling loop (translated straight from old
316 # pipe_connection.c)
317 if timeout < 0.0:
318 deadline = None
319 else:
320 deadline = time.time() + timeout
321 delay = 0.001
322 max_delay = 0.02
323 while True:
324 time.sleep(delay)
325 navail, nleft = win32.PeekNamedPipe(self._handle)
326 if navail > 0:
327 return True
328 if deadline and time.time() > deadline:
329 return False
330 if delay < max_delay:
331 delay += 0.001
332
333
334class Connection(_ConnectionBase):
335 """
336 Connection class based on an arbitrary file descriptor (Unix only), or
337 a socket handle (Windows).
338 """
339
340 if win32:
341 def _close(self):
342 win32.closesocket(self._handle)
343 _write = win32.send
344 _read = win32.recv
345 else:
346 def _close(self):
347 os.close(self._handle)
348 _write = os.write
349 _read = os.read
350
351 def _send(self, buf, write=_write):
352 remaining = len(buf)
353 while True:
354 n = write(self._handle, buf)
355 remaining -= n
356 if remaining == 0:
357 break
358 buf = buf[n:]
359
360 def _recv(self, size, read=_read):
361 buf = io.BytesIO()
362 remaining = size
363 while remaining > 0:
364 chunk = read(self._handle, remaining)
365 n = len(chunk)
366 if n == 0:
367 if remaining == size:
368 raise EOFError
369 else:
370 raise IOError("got end of file during message")
371 buf.write(chunk)
372 remaining -= n
373 return buf
374
375 def _send_bytes(self, buf):
376 # For wire compatibility with 3.2 and lower
377 n = len(buf)
378 self._send(struct.pack("=i", len(buf)))
379 # The condition is necessary to avoid "broken pipe" errors
380 # when sending a 0-length buffer if the other end closed the pipe.
381 if n > 0:
382 self._send(buf)
383
384 def _recv_bytes(self, maxsize=None):
385 buf = self._recv(4)
386 size, = struct.unpack("=i", buf.getvalue())
387 if maxsize is not None and size > maxsize:
388 return None
389 return self._recv(size)
390
391 def _poll(self, timeout):
392 r = select.select([self._handle], [], [], timeout)[0]
393 return bool(r)
394
395
396#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397# Public functions
398#
399
400class Listener(object):
401 '''
402 Returns a listener object.
403
404 This is a wrapper for a bound socket which is 'listening' for
405 connections, or for a Windows named pipe.
406 '''
407 def __init__(self, address=None, family=None, backlog=1, authkey=None):
408 family = family or (address and address_type(address)) \
409 or default_family
410 address = address or arbitrary_address(family)
411
412 if family == 'AF_PIPE':
413 self._listener = PipeListener(address, backlog)
414 else:
415 self._listener = SocketListener(address, family, backlog)
416
417 if authkey is not None and not isinstance(authkey, bytes):
418 raise TypeError('authkey should be a byte string')
419
420 self._authkey = authkey
421
422 def accept(self):
423 '''
424 Accept a connection on the bound socket or named pipe of `self`.
425
426 Returns a `Connection` object.
427 '''
428 c = self._listener.accept()
429 if self._authkey:
430 deliver_challenge(c, self._authkey)
431 answer_challenge(c, self._authkey)
432 return c
433
434 def close(self):
435 '''
436 Close the bound socket or named pipe of `self`.
437 '''
438 return self._listener.close()
439
440 address = property(lambda self: self._listener._address)
441 last_accepted = property(lambda self: self._listener._last_accepted)
442
443
444def Client(address, family=None, authkey=None):
445 '''
446 Returns a connection to the address of a `Listener`
447 '''
448 family = family or address_type(address)
449 if family == 'AF_PIPE':
450 c = PipeClient(address)
451 else:
452 c = SocketClient(address)
453
454 if authkey is not None and not isinstance(authkey, bytes):
455 raise TypeError('authkey should be a byte string')
456
457 if authkey is not None:
458 answer_challenge(c, authkey)
459 deliver_challenge(c, authkey)
460
461 return c
462
463
464if sys.platform != 'win32':
465
466 def Pipe(duplex=True):
467 '''
468 Returns pair of connection objects at either end of a pipe
469 '''
470 if duplex:
471 s1, s2 = socket.socketpair()
Antoine Pitrou5aa878c2011-05-09 21:00:28 +0200472 c1 = Connection(s1.detach())
473 c2 = Connection(s2.detach())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000474 else:
475 fd1, fd2 = os.pipe()
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200476 c1 = Connection(fd1, writable=False)
477 c2 = Connection(fd2, readable=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000478
479 return c1, c2
480
481else:
482
Benjamin Petersone711caf2008-06-11 16:44:04 +0000483 def Pipe(duplex=True):
484 '''
485 Returns pair of connection objects at either end of a pipe
486 '''
487 address = arbitrary_address('AF_PIPE')
488 if duplex:
489 openmode = win32.PIPE_ACCESS_DUPLEX
490 access = win32.GENERIC_READ | win32.GENERIC_WRITE
491 obsize, ibsize = BUFSIZE, BUFSIZE
492 else:
493 openmode = win32.PIPE_ACCESS_INBOUND
494 access = win32.GENERIC_WRITE
495 obsize, ibsize = 0, BUFSIZE
496
497 h1 = win32.CreateNamedPipe(
498 address, openmode,
499 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
500 win32.PIPE_WAIT,
501 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
502 )
503 h2 = win32.CreateFile(
504 address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
505 )
506 win32.SetNamedPipeHandleState(
507 h2, win32.PIPE_READMODE_MESSAGE, None, None
508 )
509
510 try:
511 win32.ConnectNamedPipe(h1, win32.NULL)
512 except WindowsError as e:
513 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
514 raise
515
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200516 c1 = PipeConnection(h1, writable=duplex)
517 c2 = PipeConnection(h2, readable=duplex)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000518
519 return c1, c2
520
521#
522# Definitions for connections based on sockets
523#
524
525class SocketListener(object):
526 '''
Georg Brandl734e2682008-08-12 08:18:18 +0000527 Representation of a socket which is bound to an address and listening
Benjamin Petersone711caf2008-06-11 16:44:04 +0000528 '''
529 def __init__(self, address, family, backlog=1):
530 self._socket = socket.socket(getattr(socket, family))
Jesse Nollerc5d28a02009-03-30 16:37:36 +0000531 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000532 self._socket.bind(address)
533 self._socket.listen(backlog)
Georg Brandl6aa2d1f2008-08-12 08:35:52 +0000534 self._address = self._socket.getsockname()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000535 self._family = family
536 self._last_accepted = None
537
Benjamin Petersone711caf2008-06-11 16:44:04 +0000538 if family == 'AF_UNIX':
539 self._unlink = Finalize(
Georg Brandl2ee470f2008-07-16 12:55:28 +0000540 self, os.unlink, args=(address,), exitpriority=0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000541 )
542 else:
543 self._unlink = None
544
545 def accept(self):
546 s, self._last_accepted = self._socket.accept()
547 fd = duplicate(s.fileno())
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200548 conn = Connection(fd)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000549 s.close()
550 return conn
551
552 def close(self):
553 self._socket.close()
554 if self._unlink is not None:
555 self._unlink()
556
557
558def SocketClient(address):
559 '''
560 Return a connection object connected to the socket given by `address`
561 '''
562 family = address_type(address)
Victor Stinner2b695062011-01-03 15:47:59 +0000563 with socket.socket( getattr(socket, family) ) as s:
564 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000565
Victor Stinner2b695062011-01-03 15:47:59 +0000566 while 1:
567 try:
568 s.connect(address)
569 except socket.error as e:
570 if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
571 debug('failed to connect to address %s', address)
572 raise
573 time.sleep(0.01)
574 else:
575 break
Benjamin Petersone711caf2008-06-11 16:44:04 +0000576 else:
Victor Stinner2b695062011-01-03 15:47:59 +0000577 raise
Benjamin Petersone711caf2008-06-11 16:44:04 +0000578
Victor Stinner2b695062011-01-03 15:47:59 +0000579 fd = duplicate(s.fileno())
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200580 conn = Connection(fd)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000581 return conn
582
583#
584# Definitions for connections based on named pipes
585#
586
587if sys.platform == 'win32':
588
589 class PipeListener(object):
590 '''
591 Representation of a named pipe
592 '''
593 def __init__(self, address, backlog=None):
594 self._address = address
595 handle = win32.CreateNamedPipe(
596 address, win32.PIPE_ACCESS_DUPLEX,
597 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
598 win32.PIPE_WAIT,
599 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
600 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
601 )
602 self._handle_queue = [handle]
603 self._last_accepted = None
604
605 sub_debug('listener created with address=%r', self._address)
606
607 self.close = Finalize(
608 self, PipeListener._finalize_pipe_listener,
609 args=(self._handle_queue, self._address), exitpriority=0
610 )
611
612 def accept(self):
613 newhandle = win32.CreateNamedPipe(
614 self._address, win32.PIPE_ACCESS_DUPLEX,
615 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
616 win32.PIPE_WAIT,
617 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
618 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
619 )
620 self._handle_queue.append(newhandle)
621 handle = self._handle_queue.pop(0)
622 try:
623 win32.ConnectNamedPipe(handle, win32.NULL)
624 except WindowsError as e:
625 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
626 raise
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200627 return PipeConnection(handle)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000628
629 @staticmethod
630 def _finalize_pipe_listener(queue, address):
631 sub_debug('closing listener with address=%r', address)
632 for handle in queue:
633 close(handle)
634
635 def PipeClient(address):
636 '''
637 Return a connection object connected to the pipe given by `address`
638 '''
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000639 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000640 while 1:
641 try:
642 win32.WaitNamedPipe(address, 1000)
643 h = win32.CreateFile(
644 address, win32.GENERIC_READ | win32.GENERIC_WRITE,
645 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
646 )
647 except WindowsError as e:
648 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000649 win32.ERROR_PIPE_BUSY) or _check_timeout(t):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000650 raise
651 else:
652 break
653 else:
654 raise
655
656 win32.SetNamedPipeHandleState(
657 h, win32.PIPE_READMODE_MESSAGE, None, None
658 )
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200659 return PipeConnection(h)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000660
661#
662# Authentication stuff
663#
664
665MESSAGE_LENGTH = 20
666
Benjamin Peterson1fcfe212008-06-25 12:54:22 +0000667CHALLENGE = b'#CHALLENGE#'
668WELCOME = b'#WELCOME#'
669FAILURE = b'#FAILURE#'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000670
671def deliver_challenge(connection, authkey):
672 import hmac
673 assert isinstance(authkey, bytes)
674 message = os.urandom(MESSAGE_LENGTH)
675 connection.send_bytes(CHALLENGE + message)
676 digest = hmac.new(authkey, message).digest()
677 response = connection.recv_bytes(256) # reject large message
678 if response == digest:
679 connection.send_bytes(WELCOME)
680 else:
681 connection.send_bytes(FAILURE)
682 raise AuthenticationError('digest received was wrong')
683
684def answer_challenge(connection, authkey):
685 import hmac
686 assert isinstance(authkey, bytes)
687 message = connection.recv_bytes(256) # reject large message
688 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
689 message = message[len(CHALLENGE):]
690 digest = hmac.new(authkey, message).digest()
691 connection.send_bytes(digest)
692 response = connection.recv_bytes(256) # reject large message
693 if response != WELCOME:
694 raise AuthenticationError('digest sent was rejected')
695
696#
697# Support for using xmlrpclib for serialization
698#
699
700class ConnectionWrapper(object):
701 def __init__(self, conn, dumps, loads):
702 self._conn = conn
703 self._dumps = dumps
704 self._loads = loads
705 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
706 obj = getattr(conn, attr)
707 setattr(self, attr, obj)
708 def send(self, obj):
709 s = self._dumps(obj)
710 self._conn.send_bytes(s)
711 def recv(self):
712 s = self._conn.recv_bytes()
713 return self._loads(s)
714
715def _xml_dumps(obj):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000716 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000717
718def _xml_loads(s):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000719 (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000720 return obj
721
722class XmlListener(Listener):
723 def accept(self):
724 global xmlrpclib
725 import xmlrpc.client as xmlrpclib
726 obj = Listener.accept(self)
727 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
728
729def XmlClient(*args, **kwds):
730 global xmlrpclib
731 import xmlrpc.client as xmlrpclib
732 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200733
734
735# Late import because of circular import
736from multiprocessing.forking import duplicate, close